1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-03-21 20:59:10 +00:00

[PR #11536/dae2157b backport][stable-12] merge_variables: extended merge capabilities added (#11626)

merge_variables: extended merge capabilities added (#11536)

* merge_variables: extended merge capabilities added

This extension gives you more control over the variable merging process of the lookup plugin `merge_variables`. It closes the gap between Puppet's Hiera merging capabilities and the limitations of Ansible's default variable plugin `host_group_vars` regarding fragment-based value definition. You can now decide which merge strategy should be applied to dicts, lists, and other types. Furthermore, you can specify a merge strategy that should be applied in case of type conflicts.

The default behavior of the plugin has been preserved so that it is fully backward-compatible with the already implemented state.



* Update changelogs/fragments/11536-merge-variables-extended-merging-capabilities.yml



* Update plugins/lookup/merge_variables.py



* Periods added at the end of each choice description



* Update plugins/lookup/merge_variables.py



* ref: follow project standard for choice descriptions



* ref: more examples added and refactoring



* Update plugins/lookup/merge_variables.py



* ref: some more comments to examples added



* fix: unused import removed



* ref: re-add "merge" to strategy map



* Update comments



* Specification of transformations solely as string



* Comments updated



* ref: `append_rp` and `prepend_rp` removed
feat: options dict for list transformations re-added
feat: allow setting `keep` for dedup transformation with possible values: `first` (default) and `last`



* ref: improve options documentation



* ref: documentation improved, avoiding words like newer or older in merge description



* Update plugins/lookup/merge_variables.py



* ref: "prio" replaced by "dict"



* feat: two integration tests added



---------





(cherry picked from commit dae2157bb7)

Signed-off-by: Fiehe Christoph  <c.fiehe@eurodata.de>
Co-authored-by: Christoph Fiehe <cfiehe@users.noreply.github.com>
Co-authored-by: Fiehe Christoph <c.fiehe@eurodata.de>
Co-authored-by: Felix Fontein <felix@fontein.de>
Co-authored-by: Mark <40321020+m-a-r-k-e@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2026-03-19 22:59:56 +01:00 committed by GitHub
parent deb9d63783
commit e7e9cf97e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 989 additions and 42 deletions

2
.github/BOTMETA.yml vendored
View file

@ -312,7 +312,7 @@ files:
$lookups/lmdb_kv.py: $lookups/lmdb_kv.py:
maintainers: jpmens maintainers: jpmens
$lookups/merge_variables.py: $lookups/merge_variables.py:
maintainers: rlenferink m-a-r-k-e alpex8 maintainers: rlenferink m-a-r-k-e alpex8 cfiehe
$lookups/onepass: $lookups/onepass:
labels: onepassword labels: onepassword
maintainers: samdoran maintainers: samdoran

View file

@ -0,0 +1,2 @@
minor_changes:
- merge_variables lookup plugin - extended merging capabilities added (https://github.com/ansible-collections/community.general/pull/11536).

View file

@ -9,6 +9,7 @@ author:
- Roy Lenferink (@rlenferink) - Roy Lenferink (@rlenferink)
- Mark Ettema (@m-a-r-k-e) - Mark Ettema (@m-a-r-k-e)
- Alexander Petrenz (@alpex8) - Alexander Petrenz (@alpex8)
- Christoph Fiehe (@cfiehe)
name: merge_variables name: merge_variables
short_description: Merge variables whose names match a given pattern short_description: Merge variables whose names match a given pattern
description: description:
@ -66,6 +67,81 @@ options:
type: list type: list
elements: str elements: str
version_added: 8.5.0 version_added: 8.5.0
dict_merge:
description:
- Behavior when encountering dictionary values.
type: str
default: deep
choices:
deep: Merge dictionaries recursively.
shallow: Merge only top-level values.
replace: Overwrite existing dictionaries.
keep: Keep existing dictionaries.
version_added: 12.5.0
list_merge:
description:
- Behavior when encountering list values.
type: str
default: append
choices:
replace: Overwrite existing lists.
keep: Keep existing lists.
append: Append elements to the end of an existing list.
prepend: Insert elements at the beginning of an existing list.
merge: Take the index as key and merge each element.
version_added: 12.5.0
type_conflict_merge:
description:
- Merge strategy to apply on type conflicts.
- This value is only considered in case of O(override=warn) or O(override=ignore).
type: str
default: replace
choices:
replace: Overwrite existing values.
keep: Keep existing values.
version_added: 12.5.0
default_merge:
description:
- Merge strategy applied to other types.
- This value is only considered in case of O(override=warn) or O(override=ignore).
type: str
default: replace
choices:
replace: Overwrite existing values.
keep: Keep existing values.
version_added: 12.5.0
list_transformations:
description:
- List transformations applied to list types.
- The definition order corresponds to the order in which these transformations are applied.
- Elements can be a dict with the keys mentioned below or a string naming the transformation to apply.
type: list
elements: raw
suboptions:
name:
description:
- Name of the list transformation.
required: true
type: str
choices:
flatten: Flatten lists, converting nested lists into single lists. Does not support any additional options.
dedup: Remove duplicates from lists. Supported option is C(keep).
options:
description:
- Options as key value pairs.
type: dict
suboptions:
keep:
description:
- Determines which duplicates (if any) to keep.
- Only valid in combination with the O(list_transformations[].name=dedup) list transformation.
type: str
default: first
choices:
first: Drop duplicates except for the first occurrence.
last: Drop duplicates except for the last occurrence.
default: []
version_added: 12.5.0
""" """
EXAMPLES = r""" EXAMPLES = r"""
@ -88,6 +164,32 @@ testb__test_dict:
ports: ports:
- 3 - 3
testa_low_dict__test:
a:
a:
x: low_value
y: low_value
list:
- low_value
b:
- 1
- 1
- 2
- 3
testb_high_dict__test:
a:
a:
y: high_value
z: high_value
list:
- high_value
b:
- 3
- 4
- 4
- '5': value
# Merge variables that end with '__test_dict' and store the result in a variable 'example_a' # Merge variables that end with '__test_dict' and store the result in a variable 'example_a'
example_a: "{{ lookup('community.general.merge_variables', '__test_dict', pattern_type='suffix') }}" example_a: "{{ lookup('community.general.merge_variables', '__test_dict', pattern_type='suffix') }}"
@ -105,6 +207,56 @@ example_b: "{{ lookup('community.general.merge_variables', '^.+__test_list$', in
# - "list init item 2" # - "list init item 2"
# - "test a item 1" # - "test a item 1"
# - "test b item 1" # - "test b item 1"
# Shallow merge variables that end with '__test', insert list elements at the beginning of the list,
# remove duplicates from lists (keep the first occurrence), and store the result in a variable 'example_c'
example_c: "{{
lookup(
'community.general.merge_variables',
'^.+__test$',
dict_merge='shallow',
list_merge='prepend',
list_transformations=['dedup']) }}"
# The variable example_c now contains:
# a:
# a:
# y: high_value
# z: high_value
# list:
# - high_value
# b:
# - 3
# - 4
# - '5': value
# - 1
# - 2
# Deep merge variables that end with '__test', merge list elements by index, ignore overrides, apply the strategies
# 'keep' for type conflicts and 'replace' for other types, and store the result in a variable 'example_d'
example_d: "{{
lookup(
'community.general.merge_variables',
'^.+__test$',
dict_merge='deep',
list_merge='merge',
override='ignore',
type_conflict_merge='keep',
default_merge='replace') }}"
# The variable example_d now contains:
# a:
# a:
# x: low_value
# y: high_value
# list:
# - high_value
# z: high_value
# b:
# - 3
# - 4
# - 4
# - 3
""" """
RETURN = r""" RETURN = r"""
@ -114,16 +266,23 @@ _raw:
elements: raw elements: raw
""" """
import re import re
import typing as t
from abc import ABC, abstractmethod
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.plugins.lookup import LookupBase from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display from ansible.utils.display import Display
if t.TYPE_CHECKING:
from collections.abc import Callable
display = Display() display = Display()
def _verify_and_get_type(variable): def _verify_and_get_type(variable: t.Any) -> str:
if isinstance(variable, list): if isinstance(variable, list):
return "list" return "list"
elif isinstance(variable, dict): elif isinstance(variable, dict):
@ -133,12 +292,17 @@ def _verify_and_get_type(variable):
class LookupModule(LookupBase): class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs): def run(self, terms: list[str], variables: dict[str, t.Any], **kwargs) -> list[t.Any]:
self.set_options(direct=kwargs) self.set_options(direct=kwargs)
initial_value = self.get_option("initial_value", None) initial_value = self.get_option("initial_value", None)
self._override = self.get_option("override", "error") self._override_behavior = self.get_option("override", "error")
self._pattern_type = self.get_option("pattern_type", "regex") self._pattern_type = self.get_option("pattern_type", "regex")
self._groups = self.get_option("groups", None) self._groups = self.get_option("groups", None)
self._dict_merge = self.get_option("dict_merge", "deep")
self._list_merge = self.get_option("list_merge", "append")
self._type_conflict_merge = self.get_option("type_conflict_merge", "replace")
self._default_merge = self.get_option("default_merge", "replace")
self._list_transformations = self.get_option("list_transformations", [])
ret = [] ret = []
for term in terms: for term in terms:
@ -158,25 +322,25 @@ class LookupModule(LookupBase):
return ret return ret
def _is_host_in_allowed_groups(self, host_groups): def _is_host_in_allowed_groups(self, host_groups: list[str]) -> bool:
if "all" in self._groups: if "all" in self._groups:
return True return True
group_intersection = [host_group_name for host_group_name in host_groups if host_group_name in self._groups] group_intersection = [host_group_name for host_group_name in host_groups if host_group_name in self._groups]
return bool(group_intersection) return bool(group_intersection)
def _var_matches(self, key, search_pattern): def _var_matches(self, key: str, search_pattern: str) -> bool:
if self._pattern_type == "prefix": if self._pattern_type == "prefix":
return key.startswith(search_pattern) return key.startswith(search_pattern)
elif self._pattern_type == "suffix": elif self._pattern_type == "suffix":
return key.endswith(search_pattern) return key.endswith(search_pattern)
elif self._pattern_type == "regex": elif self._pattern_type == "regex":
matcher = re.compile(search_pattern) matcher = re.compile(search_pattern)
return matcher.search(key) return matcher.search(key) is not None
return False return False
def _merge_vars(self, search_pattern, initial_value, variables): def _merge_vars(self, search_pattern: str, initial_value: t.Any, variables: dict[str, t.Any]) -> t.Any:
display.vvv(f"Merge variables with {self._pattern_type}: {search_pattern}") display.vvv(f"Merge variables with {self._pattern_type}: {search_pattern}")
var_merge_names = sorted([key for key in variables.keys() if self._var_matches(key, search_pattern)]) var_merge_names = sorted([key for key in variables.keys() if self._var_matches(key, search_pattern)])
display.vvv(f"The following variables will be merged: {var_merge_names}") display.vvv(f"The following variables will be merged: {var_merge_names}")
@ -187,11 +351,42 @@ class LookupModule(LookupBase):
prev_var_type = _verify_and_get_type(initial_value) prev_var_type = _verify_and_get_type(initial_value)
result = initial_value result = initial_value
builder = (
MergerBuilder()
.with_type_strategy(list, ListMergeStrategies.from_name(self._list_merge))
.with_type_conflict_strategy(BaseMergeStrategies.from_name(self._type_conflict_merge))
.with_default_strategy(BaseMergeStrategies.from_name(self._default_merge))
.with_override_behavior(self._override_behavior)
)
if self._dict_merge == "deep":
builder.with_type_strategy(dict, DictMergeStrategies.Merge())
elif self._dict_merge == "shallow":
builder.with_shallow_merge()
else:
builder.with_type_strategy(dict, DictMergeStrategies.from_name(self._dict_merge))
for transformation in self._list_transformations:
if isinstance(transformation, str):
builder.with_transformation(list, ListTransformations.from_name(transformation))
elif isinstance(transformation, dict):
name = transformation["name"]
options = transformation.get("options", {})
builder.with_transformation(list, ListTransformations.from_name(name, **options))
else:
raise AnsibleError(
f"Transformations must be specified through values of type 'str' or 'dict', but a value of type '{type(transformation)}' was given"
)
merger = builder.build()
if self._templar is None:
raise AnsibleError("Templar is not available")
templar = self._templar.copy_with_new_env(available_variables=variables)
for var_name in var_merge_names: for var_name in var_merge_names:
temp_templar = self._templar.copy_with_new_env( var_value = templar.template(variables[var_name]) # Render jinja2 templates
available_variables=variables
) # tmp. switch renderer to context of current variables
var_value = temp_templar.template(variables[var_name]) # Render jinja2 templates
var_type = _verify_and_get_type(var_value) var_type = _verify_and_get_type(var_value)
if prev_var_type is None: if prev_var_type is None:
@ -203,29 +398,271 @@ class LookupModule(LookupBase):
result = var_value result = var_value
continue continue
if var_type == "dict": result = merger.merge(path=[var_name], left=result, right=var_value)
result = self._merge_dict(var_value, result, [var_name])
else: # var_type == "list"
result += var_value
return result return result
def _merge_dict(self, src, dest, path):
for key, value in src.items(): class MergeStrategy(ABC):
if isinstance(value, dict): @abstractmethod
node = dest.setdefault(key, {}) def merge(self, merger: ObjectMerger, path: list[str], left: t.Any, right: t.Any) -> t.Any:
self._merge_dict(value, node, path + [key]) raise NotImplementedError
elif isinstance(value, list) and key in dest:
dest[key] += value
class MergeStrategies:
@classmethod
def from_name(cls, name: str, *args, **kwargs) -> MergeStrategy:
if name in cls.strategies:
return cls.strategies[name](*args, **kwargs)
else:
raise AnsibleError(f"Unknown merge strategy '{name}'")
strategies: dict[str, Callable[..., t.Any]] = {}
class BaseMergeStrategies(MergeStrategies):
class Replace(MergeStrategy):
def merge(self, merger: ObjectMerger, path: list[str], left: t.Any, right: t.Any) -> t.Any:
return right
class Keep(MergeStrategy):
def merge(self, merger: ObjectMerger, path: list[str], left: t.Any, right: t.Any) -> t.Any:
return left
strategies = {
"replace": Replace,
"keep": Keep,
}
class DictMergeStrategies(MergeStrategies):
class Merge(MergeStrategy):
def merge(
self, merger: ObjectMerger, path: list[str], left: dict[str, t.Any], right: dict[str, t.Any]
) -> dict[str, t.Any]:
result = left
for key, value in right.items():
if key not in result:
result[key] = value
else:
path.append(key)
merged = merger.merge(path=path, left=result[key], right=value)
merger.before_override(path=path, old_value=result[key], new_value=merged)
result[key] = merged
return result
strategies = {
"merge": Merge,
"replace": BaseMergeStrategies.Replace,
"keep": BaseMergeStrategies.Keep,
}
class ListMergeStrategies(MergeStrategies):
class Append(MergeStrategy):
def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]:
return left + right
class Prepend(MergeStrategy):
def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]:
return right + left
class Merge(MergeStrategy):
def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]:
result = left
for i, value in enumerate(right):
if i >= len(result):
result.extend(right[i:])
break
path.append(str(i))
merged = merger.merge(path=path, left=result[i], right=value)
merger.before_override(path=path, old_value=result[i], new_value=merged)
result[i] = merged
return result
strategies = {
"append": Append,
"prepend": Prepend,
"merge": Merge,
"replace": BaseMergeStrategies.Replace,
"keep": BaseMergeStrategies.Keep,
}
class Transformation(ABC):
@abstractmethod
def transform(self, merger: ObjectMerger, value: t.Any) -> t.Any:
raise NotImplementedError
class Transformations:
@classmethod
def from_name(cls, name: str, *args, **kwargs) -> Transformation:
if name in cls.transformations:
return cls.transformations[name](*args, **kwargs)
else:
raise AnsibleError(f"Unknown transformation '{name}'")
transformations: dict[str, Callable[..., t.Any]] = {}
class ListTransformations(Transformations):
class Dedup(Transformation):
def __init__(self, keep: str = "first"):
self.keep = keep
def transform(self, merger: ObjectMerger, value: list[t.Any]) -> list[t.Any]:
result = []
if self.keep == "first":
for item in value:
if item not in result:
result.append(item)
elif self.keep == "last":
for item in reversed(value):
if item not in result:
result.insert(0, item)
else: else:
if (key in dest) and dest[key] != value: raise AnsibleError(
msg = f"The key '{key}' with value '{dest[key]}' will be overwritten with value '{value}' from '{'.'.join(path)}.{key}'" f"Unsupported 'keep' value for deduplication transformation. Given was '{self.keep}', but must be either 'first' or 'last'"
)
if self._override == "error": return result
raise AnsibleError(msg)
if self._override == "warn":
display.warning(msg)
dest[key] = value class Flatten(Transformation):
def transform(self, merger: ObjectMerger, value: list[t.Any]) -> list[t.Any]:
result = []
for item in value:
if isinstance(item, list):
result.extend(self.transform(merger, item))
else:
result.append(item)
return result
return dest transformations = {
"dedup": Dedup,
"flatten": Flatten,
}
class MergerBuilder:
def __init__(self) -> None:
self.type_strategies: list[tuple[type, MergeStrategy]] = []
self.type_conflict_strategy: MergeStrategy = BaseMergeStrategies.Replace()
self.default_strategy: MergeStrategy = BaseMergeStrategies.Replace()
self.transformations: list[tuple[type, Transformation]] = []
self.override_behavior: str = "error"
self.shallow_merge: bool = False
def with_shallow_merge(self, shallow_merge: bool = True) -> MergerBuilder:
self.shallow_merge = shallow_merge
return self
def with_override_behavior(self, override_behavior: str) -> MergerBuilder:
self.override_behavior = override_behavior
return self
def with_type_strategy(self, cls, merge_strategy: MergeStrategy) -> MergerBuilder:
self.type_strategies.append((cls, merge_strategy))
return self
def with_default_strategy(self, merge_strategy: MergeStrategy) -> MergerBuilder:
self.default_strategy = merge_strategy
return self
def with_type_conflict_strategy(self, merge_strategy: MergeStrategy) -> MergerBuilder:
self.type_conflict_strategy = merge_strategy
return self
def with_transformation(self, cls, *args: Transformation) -> MergerBuilder:
for transformation in args:
self.transformations.append((cls, transformation))
return self
def build(self) -> Merger:
merger = ObjectMerger(
type_strategies=self.type_strategies,
type_conflict_strategy=self.type_conflict_strategy,
default_strategy=self.default_strategy,
transformations=self.transformations,
override_behavior=self.override_behavior,
)
if self.shallow_merge:
return ShallowMerger(merger)
return merger
class Merger(ABC):
@abstractmethod
def merge(self, path: list[str], left: t.Any, right: t.Any) -> t.Any:
raise NotImplementedError
class ObjectMerger(Merger):
def __init__(
self,
type_strategies: list[tuple[type, MergeStrategy]],
type_conflict_strategy: MergeStrategy,
default_strategy: MergeStrategy,
transformations: list[tuple[type, Transformation]],
override_behavior: str,
) -> None:
self.type_strategies = type_strategies
self.type_conflict_strategy = type_conflict_strategy
self.default_strategy = default_strategy
self.transformations = transformations
self.override_behavior = override_behavior
def before_override(self, path: list[str], old_value: t.Any, new_value: t.Any) -> None:
if (
not isinstance(old_value, type(new_value))
and not isinstance(new_value, type(old_value))
or not isinstance(new_value, dict)
and not isinstance(new_value, list)
and old_value != new_value
):
# An override is regarded as a value type conflict or
# when both values are neither dicts nor lists and
# have different values
msg = f"The key '{path[-1]}' with value '{old_value}' will be overwritten with value '{new_value}' from '{'.'.join(path)}'"
if self.override_behavior == "error":
raise AnsibleError(msg)
elif self.override_behavior == "warn":
display.warning(msg)
def transform(self, value: t.Any) -> t.Any:
for cls, transformation in self.transformations:
if isinstance(value, cls):
value = transformation.transform(self, value)
return value
def lookup_merge_strategy(self, left: t.Any, right: t.Any) -> MergeStrategy:
for cls, merge_strategy in self.type_strategies:
if isinstance(left, cls) and isinstance(right, cls):
return merge_strategy
if not isinstance(left, type(right)) and not isinstance(right, type(left)):
return self.type_conflict_strategy
return self.default_strategy
def apply_merge_strategy(self, merge_strategy: MergeStrategy, path: list[str], left: t.Any, right: t.Any) -> t.Any:
return self.transform(merge_strategy.merge(self, path, left, right))
def merge(self, path: list[str], left: t.Any, right: t.Any) -> t.Any:
return self.apply_merge_strategy(self.lookup_merge_strategy(left, right), path, left, right)
class ShallowMerger(Merger):
def __init__(self, merger: ObjectMerger) -> None:
self.merger = merger
def merge(self, path: list[str], left: t.Any, right: t.Any) -> t.Any:
if isinstance(left, dict) and isinstance(right, dict):
return self.merger.apply_merge_strategy(DictMergeStrategies.Merge(), path, left, right)
return self.merger.merge(path, left, right)

View file

@ -44,6 +44,62 @@
vars: vars:
merged_dict: "{{ lookup('community.general.merge_variables', '^.+__merge_dict$') }}" merged_dict: "{{ lookup('community.general.merge_variables', '^.+__merge_dict$') }}"
- name: Test shallow dict merge with 'list_merge=append' and 'dedup'
block:
- name: Print the merged dict
ansible.builtin.debug:
msg: "{{ merged_dict }}"
- name: Validate the merged dict
ansible.builtin.assert:
that: "merged_dict == expected_dict"
vars:
expected_dict:
a: "{{ testdict4__test['a'] }}"
b:
- 3
- 4
- "5": value
- 1
- 2
vars:
merged_dict: "{{
lookup(
'community.general.merge_variables', '__test', pattern_type='suffix', override='ignore',
dict_merge='shallow', list_merge='prepend', list_transformations=['dedup']) }}"
- name: Test deep dict merge with 'list_merge=prepend' and 'dedup' with 'keep=last'
block:
- name: Print the merged dict
ansible.builtin.debug:
msg: "{{ merged_dict }}"
- name: Validate the merged dict
ansible.builtin.assert:
that: "merged_dict == expected_dict"
vars:
expected_dict:
a:
a:
x: low_value
y: high_value
list:
- high_value
- low_value
z: high_value
b:
- 4
- "5": value
- 1
- 2
- 3
vars:
merged_dict: "{{
lookup(
'community.general.merge_variables', '__test', pattern_type='suffix', override='ignore',
dict_merge='deep', list_merge='prepend',
list_transformations=[{'name': 'dedup', 'options': {'keep': 'last'}}]) }}"
# Test the behavior when no results are found # Test the behavior when no results are found
- name: Test merge without results - name: Test merge without results
block: block:

View file

@ -21,6 +21,31 @@ testdict2__merge_dict:
list_item: list_item:
- test2 - test2
testdict3__test:
a:
a:
x: low_value
y: low_value
list:
- low_value
b:
- 1
- 1
- 2
- 3
testdict4__test:
a:
a:
y: high_value
z: high_value
list:
- high_value
b:
- 3
- 4
- 4
- "5": value
override_warn_init: override_warn_init:
key_to_override: Initial value key_to_override: Initial value
override__override_warn: override__override_warn:

View file

@ -5,6 +5,7 @@
from __future__ import annotations from __future__ import annotations
import unittest import unittest
from copy import deepcopy
from unittest.mock import patch from unittest.mock import patch
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
@ -15,6 +16,14 @@ from ansible_collections.community.internal_test_tools.tests.unit.mock.loader im
from ansible_collections.community.general.plugins.lookup import merge_variables from ansible_collections.community.general.plugins.lookup import merge_variables
merge_hash_data = {
"low_dict": {"a": {"a": {"x": "low_value", "y": "low_value", "list": ["low_value"]}}, "b": [1, 1, 2, 3]},
"high_dict": {
"a": {"a": {"y": "high_value", "z": "high_value", "list": ["high_value"]}},
"b": [3, 4, 4, {"5": "value"}],
},
}
class TestMergeVariablesLookup(unittest.TestCase): class TestMergeVariablesLookup(unittest.TestCase):
class HostVarsMock(dict): class HostVarsMock(dict):
@ -33,7 +42,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
self.merge_vars_lookup = merge_variables.LookupModule(loader=self.loader, templar=self.templar) self.merge_vars_lookup = merge_variables.LookupModule(loader=self.loader, templar=self.templar)
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", None]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object(Templar, "template", side_effect=[["item1"], ["item3"]]) @patch.object(Templar, "template", side_effect=[["item1"], ["item3"]])
def test_merge_list(self, mock_set_options, mock_get_option, mock_template): def test_merge_list(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run( results = self.merge_vars_lookup.run(
@ -44,7 +57,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
self.assertEqual(results, [["item1", "item3"]]) self.assertEqual(results, [["item1", "item3"]])
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[["initial_item"], "ignore", "suffix", None]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[["initial_item"], "ignore", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object(Templar, "template", side_effect=[["item1"], ["item3"]]) @patch.object(Templar, "template", side_effect=[["item1"], ["item3"]])
def test_merge_list_with_initial_value(self, mock_set_options, mock_get_option, mock_template): def test_merge_list_with_initial_value(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run( results = self.merge_vars_lookup.run(
@ -55,7 +72,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
self.assertEqual(results, [["initial_item", "item1", "item3"]]) self.assertEqual(results, [["initial_item", "item1", "item3"]])
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", None]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object( @patch.object(
Templar, Templar,
"template", "template",
@ -76,7 +97,17 @@ class TestMergeVariablesLookup(unittest.TestCase):
@patch.object( @patch.object(
AnsiblePlugin, AnsiblePlugin,
"get_option", "get_option",
side_effect=[{"initial_item": "random value", "list_item": ["test0"]}, "ignore", "suffix", None], side_effect=[
{"initial_item": "random value", "list_item": ["test0"]},
"ignore",
"suffix",
None,
"deep",
"append",
"replace",
"replace",
[],
],
) )
@patch.object( @patch.object(
Templar, Templar,
@ -105,7 +136,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
) )
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "warn", "suffix", None]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "warn", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object(Templar, "template", side_effect=[{"item": "value1"}, {"item": "value2"}]) @patch.object(Templar, "template", side_effect=[{"item": "value1"}, {"item": "value2"}])
@patch.object(Display, "warning") @patch.object(Display, "warning")
def test_merge_dict_non_unique_warning(self, mock_set_options, mock_get_option, mock_template, mock_display): def test_merge_dict_non_unique_warning(self, mock_set_options, mock_get_option, mock_template, mock_display):
@ -118,7 +153,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
self.assertEqual(results, [{"item": "value2"}]) self.assertEqual(results, [{"item": "value2"}])
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "error", "suffix", None]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "error", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object(Templar, "template", side_effect=[{"item": "value1"}, {"item": "value2"}]) @patch.object(Templar, "template", side_effect=[{"item": "value1"}, {"item": "value2"}])
def test_merge_dict_non_unique_error(self, mock_set_options, mock_get_option, mock_template): def test_merge_dict_non_unique_error(self, mock_set_options, mock_get_option, mock_template):
with self.assertRaises(AnsibleError): with self.assertRaises(AnsibleError):
@ -128,7 +167,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
) )
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", None]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object(Templar, "template", side_effect=[{"item1": "test", "list_item": ["test1"]}, ["item2", "item3"]]) @patch.object(Templar, "template", side_effect=[{"item1": "test", "list_item": ["test1"]}, ["item2", "item3"]])
def test_merge_list_and_dict(self, mock_set_options, mock_get_option, mock_template): def test_merge_list_and_dict(self, mock_set_options, mock_get_option, mock_template):
with self.assertRaises(AnsibleError): with self.assertRaises(AnsibleError):
@ -141,7 +184,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
) )
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["all"]]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", ["all"], "deep", "append", "replace", "replace", []],
)
@patch.object( @patch.object(
Templar, Templar,
"template", "template",
@ -174,7 +221,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
) )
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["dummy1"]]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", ["dummy1"], "deep", "append", "replace", "replace", []],
)
@patch.object( @patch.object(
Templar, Templar,
"template", "template",
@ -212,7 +263,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
) )
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"]]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"], "deep", "append", "replace", "replace", []],
)
@patch.object( @patch.object(
Templar, Templar,
"template", "template",
@ -249,7 +304,11 @@ class TestMergeVariablesLookup(unittest.TestCase):
) )
@patch.object(AnsiblePlugin, "set_options") @patch.object(AnsiblePlugin, "set_options")
@patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"]]) @patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"], "deep", "append", "replace", "replace", []],
)
@patch.object( @patch.object(
Templar, Templar,
"template", "template",
@ -270,3 +329,371 @@ class TestMergeVariablesLookup(unittest.TestCase):
results = self.merge_vars_lookup.run(["__merge_var"], variables) results = self.merge_vars_lookup.run(["__merge_var"], variables)
self.assertEqual(results, [["item1", "item5"]]) self.assertEqual(results, [["item1", "item5"]])
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "shallow", "replace", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_shallow_and_list_replace(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_shallow_and_list_replace"],
{
"testdict1__merge_dict_shallow_and_list_replace": merge_hash_data["low_dict"],
"testdict2__merge_dict_shallow_and_list_replace": merge_hash_data["high_dict"],
},
)
self.assertEqual(results, [merge_hash_data["high_dict"]])
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "shallow", "keep", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_shallow_and_list_keep(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_shallow_and_list_keep"],
{
"testdict1__merge_dict_shallow_and_list_keep": merge_hash_data["low_dict"],
"testdict2__merge_dict_shallow_and_list_keep": merge_hash_data["high_dict"],
},
)
self.assertEqual(results, [{"a": merge_hash_data["high_dict"]["a"], "b": merge_hash_data["low_dict"]["b"]}])
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "shallow", "append", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_shallow_and_list_append(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_shallow_and_list_append"],
{
"testdict1__merge_dict_shallow_and_list_append": merge_hash_data["low_dict"],
"testdict2__merge_dict_shallow_and_list_append": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": merge_hash_data["high_dict"]["a"],
"b": merge_hash_data["low_dict"]["b"] + merge_hash_data["high_dict"]["b"],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "shallow", "prepend", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_shallow_and_list_prepend(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_shallow_and_list_prepend"],
{
"testdict1__merge_dict_shallow_and_list_prepend": merge_hash_data["low_dict"],
"testdict2__merge_dict_shallow_and_list_prepend": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": merge_hash_data["high_dict"]["a"],
"b": merge_hash_data["high_dict"]["b"] + merge_hash_data["low_dict"]["b"],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "replace", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_deep_and_list_replace(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_deep_and_list_replace"],
{
"testdict1__merge_dict_deep_and_list_replace": merge_hash_data["low_dict"],
"testdict2__merge_dict_deep_and_list_replace": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": {"a": {"x": "low_value", "y": "high_value", "z": "high_value", "list": ["high_value"]}},
"b": merge_hash_data["high_dict"]["b"],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "keep", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_deep_and_list_keep(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_deep_and_list_keep"],
{
"testdict1__merge_dict_deep_and_list_keep": merge_hash_data["low_dict"],
"testdict2__merge_dict_deep_and_list_keep": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": {"a": {"x": "low_value", "y": "high_value", "z": "high_value", "list": ["low_value"]}},
"b": merge_hash_data["low_dict"]["b"],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_deep_and_list_append(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_deep_and_list_append"],
{
"testdict1__merge_dict_deep_and_list_append": merge_hash_data["low_dict"],
"testdict2__merge_dict_deep_and_list_append": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": {
"a": {
"x": "low_value",
"y": "high_value",
"z": "high_value",
"list": ["low_value", "high_value"],
}
},
"b": merge_hash_data["low_dict"]["b"] + merge_hash_data["high_dict"]["b"],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "prepend", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_deep_and_list_prepend(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_deep_and_list_prepend"],
{
"testdict1__merge_dict_deep_and_list_prepend": merge_hash_data["low_dict"],
"testdict2__merge_dict_deep_and_list_prepend": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": {
"a": {
"x": "low_value",
"y": "high_value",
"z": "high_value",
"list": ["high_value", "low_value"],
}
},
"b": merge_hash_data["high_dict"]["b"] + merge_hash_data["low_dict"]["b"],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "replace", "append", "keep", "keep", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_replace(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_replace"],
{
"testdict1__merge_dict_replace": merge_hash_data["low_dict"],
"testdict2__merge_dict_replace": merge_hash_data["high_dict"],
},
)
self.assertEqual(results, [merge_hash_data["high_dict"]])
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "keep", "append", "replace", "replace", []],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_keep(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_dict_keep"],
{
"testdict1__merge_dict_keep": merge_hash_data["low_dict"],
"testdict2__merge_dict_keep": merge_hash_data["high_dict"],
},
)
self.assertEqual(results, [merge_hash_data["low_dict"]])
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "merge", "replace", "replace", []],
)
@patch.object(
Templar,
"template",
side_effect=[[{"a": "b", "c": "d"}, {"c": "d"}], [{"a": "x", "y": "z"}, {"c": "z"}, {"a": "y"}]],
)
def test_merge_list_deep(self, mock_set_options, mock_get_option, mock_template):
results = self.merge_vars_lookup.run(
["__merge_list_deep"],
{
"testdict1__merge_list_deep": [{"a": "b", "c": "d"}, {"c": "d"}],
"testdict2__merge_list_deep": [{"a": "x", "y": "z"}, {"c": "z"}, {"a": "y"}],
},
)
self.assertEqual(results, [[{"a": "x", "c": "d", "y": "z"}, {"c": "z"}, {"a": "y"}]])
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", ["dedup"]],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_deep_and_list_append_with_dedup_keep_first(
self, mock_set_options, mock_get_option, mock_template
):
results = self.merge_vars_lookup.run(
["__merge_dict_deep_and_list_append_with_dedup_keep_first"],
{
"testdict1__merge_dict_deep_and_list_append_with_dedup_keep_first": merge_hash_data["low_dict"],
"testdict2__merge_dict_deep_and_list_append_with_dedup_keep_first": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": {
"a": {
"x": "low_value",
"y": "high_value",
"z": "high_value",
"list": ["low_value", "high_value"],
}
},
"b": [1, 2, 3, 4, {"5": "value"}],
}
],
)
@patch.object(AnsiblePlugin, "set_options")
@patch.object(
AnsiblePlugin,
"get_option",
side_effect=[
None,
"ignore",
"suffix",
None,
"deep",
"prepend",
"replace",
"replace",
[{"name": "dedup", "options": {"keep": "last"}}],
],
)
@patch.object(
Templar, "template", side_effect=[deepcopy(merge_hash_data["low_dict"]), deepcopy(merge_hash_data["high_dict"])]
)
def test_merge_dict_deep_and_list_prepend_with_dedup_keep_last(
self, mock_set_options, mock_get_option, mock_template
):
results = self.merge_vars_lookup.run(
["__merge_dict_deep_and_list_prepend_with_dedup_keep_last"],
{
"testdict1__merge_dict_deep_and_list_prepend_with_dedup_keep_last": merge_hash_data["low_dict"],
"testdict2__merge_dict_deep_and_list_prepend_with_dedup_keep_last": merge_hash_data["high_dict"],
},
)
self.assertEqual(
results,
[
{
"a": {
"a": {
"x": "low_value",
"y": "high_value",
"list": ["high_value", "low_value"],
"z": "high_value",
}
},
"b": [4, {"5": "value"}, 1, 2, 3],
}
],
)