diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index e36914d809..1645450936 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -312,7 +312,7 @@ files: $lookups/lmdb_kv.py: maintainers: jpmens $lookups/merge_variables.py: - maintainers: rlenferink m-a-r-k-e alpex8 + maintainers: rlenferink m-a-r-k-e alpex8 cfiehe $lookups/onepass: labels: onepassword maintainers: samdoran diff --git a/changelogs/fragments/11536-merge-variables-extended-merging-capabilities.yml b/changelogs/fragments/11536-merge-variables-extended-merging-capabilities.yml new file mode 100644 index 0000000000..b9397ae2ae --- /dev/null +++ b/changelogs/fragments/11536-merge-variables-extended-merging-capabilities.yml @@ -0,0 +1,2 @@ +minor_changes: + - merge_variables - extended merging capabilities added (https://github.com/ansible-collections/community.general/pull/11536). diff --git a/plugins/lookup/merge_variables.py b/plugins/lookup/merge_variables.py index 8866bac592..846cd3d7c9 100644 --- a/plugins/lookup/merge_variables.py +++ b/plugins/lookup/merge_variables.py @@ -9,6 +9,7 @@ author: - Roy Lenferink (@rlenferink) - Mark Ettema (@m-a-r-k-e) - Alexander Petrenz (@alpex8) + - Christoph Fiehe (@cfiehe) name: merge_variables short_description: Merge variables whose names match a given pattern description: @@ -66,6 +67,70 @@ options: type: list elements: str version_added: 8.5.0 + dict_merge: + description: + - Behavior when encountering dictionary values. + type: str + default: deep + choices: + deep: merge dictionaries recursively + shallow: merge only top-level values + replace: overwrite older dict with newer one + keep: discard newer dict + version_added: 12.5.0 + list_merge: + description: + - Behavior when encountering list values. + type: str + default: append + choices: + replace: overwrite older entries with newer ones + keep: discard newer entries + append: append newer entries to the older ones + prepend: insert newer entries in front of the older ones + append_rp: append newer entries to the older ones, overwrite duplicates + prepend_rp: insert newer entries in front of the older ones, discard duplicates + merge: take the index as key and merge the entries + version_added: 12.5.0 + type_conflict_merge: + description: + - Merge strategy to apply on type conflicts. + type: str + default: replace + choices: + replace: overwrite older value with newer one + keep: discard newer value + version_added: 12.5.0 + default_merge: + description: + - Merge strategy applied to other types. + type: str + default: replace + choices: + replace: overwrite older value with newer one + keep: discard newer value + version_added: 12.5.0 + list_transformations: + description: + - List transformations applied to list types. The definition order corresponds to the order in which these transformations are applied. + - Elements can be a dict with the keys mentioned below or a string naming the transformation to apply. + type: list + elements: raw + suboptions: + name: + description: + - Name of the list transformation. + required: true + type: str + choices: + flatten: flatten lists, converting nested lists into single lists + dedup: remove duplicates from lists + options: + description: + - Options as key value pairs. V(flatten) and V(dedup) do not support any additional options. + type: dict + default: [] + version_added: 12.5.0 """ EXAMPLES = r""" @@ -115,15 +180,21 @@ _raw: """ import re +import typing as t +from abc import ABC, abstractmethod + +if t.TYPE_CHECKING: + from collections.abc import Callable from ansible.errors import AnsibleError from ansible.plugins.lookup import LookupBase +from ansible.template import Templar from ansible.utils.display import Display display = Display() -def _verify_and_get_type(variable): +def _verify_and_get_type(variable: t.Any) -> str: if isinstance(variable, list): return "list" elif isinstance(variable, dict): @@ -133,12 +204,17 @@ def _verify_and_get_type(variable): class LookupModule(LookupBase): - def run(self, terms, variables=None, **kwargs): + def run(self, terms: list[str], variables: dict[str, t.Any], **kwargs) -> list[t.Any]: self.set_options(direct=kwargs) initial_value = self.get_option("initial_value", None) - self._override = self.get_option("override", "error") + self._override_behavior = self.get_option("override", "error") self._pattern_type = self.get_option("pattern_type", "regex") self._groups = self.get_option("groups", None) + self._dict_merge = self.get_option("dict_merge", "deep") + self._list_merge = self.get_option("list_merge", "append") + self._type_conflict_merge = self.get_option("type_conflict_merge", "replace") + self._default_merge = self.get_option("default_merge", "replace") + self._list_transformations = self.get_option("list_transformations", []) ret = [] for term in terms: @@ -158,25 +234,25 @@ class LookupModule(LookupBase): return ret - def _is_host_in_allowed_groups(self, host_groups): + def _is_host_in_allowed_groups(self, host_groups: list[str]) -> bool: if "all" in self._groups: return True group_intersection = [host_group_name for host_group_name in host_groups if host_group_name in self._groups] return bool(group_intersection) - def _var_matches(self, key, search_pattern): + def _var_matches(self, key: str, search_pattern: str) -> bool: if self._pattern_type == "prefix": return key.startswith(search_pattern) elif self._pattern_type == "suffix": return key.endswith(search_pattern) elif self._pattern_type == "regex": matcher = re.compile(search_pattern) - return matcher.search(key) + return matcher.search(key) is not None return False - def _merge_vars(self, search_pattern, initial_value, variables): + def _merge_vars(self, search_pattern: str, initial_value: t.Any, variables: dict[str, t.Any]) -> t.Any: display.vvv(f"Merge variables with {self._pattern_type}: {search_pattern}") var_merge_names = sorted([key for key in variables.keys() if self._var_matches(key, search_pattern)]) display.vvv(f"The following variables will be merged: {var_merge_names}") @@ -187,11 +263,42 @@ class LookupModule(LookupBase): prev_var_type = _verify_and_get_type(initial_value) result = initial_value + builder = ( + MergerBuilder() + .with_type_strategy(list, ListMergeStrategies.from_name(self._list_merge)) + .with_type_conflict_strategy(BaseMergeStrategies.from_name(self._type_conflict_merge)) + .with_default_strategy(BaseMergeStrategies.from_name(self._default_merge)) + .with_override_behavior(self._override_behavior) + ) + + if self._dict_merge == "deep": + builder.with_type_strategy(dict, DictMergeStrategies.Merge()) + elif self._dict_merge == "shallow": + builder.with_shallow_merge() + else: + builder.with_type_strategy(dict, DictMergeStrategies.from_name(self._dict_merge)) + + for transformation in self._list_transformations: + if isinstance(transformation, str): + builder.with_transformation(list, ListTransformations.from_name(transformation)) + elif isinstance(transformation, dict): + name = transformation["name"] + options = transformation.get("options", {}) + builder.with_transformation(list, ListTransformations.from_name(name, **options)) + else: + raise AnsibleError( + f"Transformations must be specified through values of type 'str' or 'dict', but a value of type '{type(transformation)}' was given." + ) + + merger = builder.build() + + if self._templar is not None: + templar = self._templar.copy_with_new_env(available_variables=variables) + else: + templar = Templar(loader=self._loader, variables=variables) + for var_name in var_merge_names: - temp_templar = self._templar.copy_with_new_env( - available_variables=variables - ) # tmp. switch renderer to context of current variables - var_value = temp_templar.template(variables[var_name]) # Render jinja2 templates + var_value = templar.template(variables[var_name]) # Render jinja2 templates var_type = _verify_and_get_type(var_value) if prev_var_type is None: @@ -203,29 +310,346 @@ class LookupModule(LookupBase): result = var_value continue - if var_type == "dict": - result = self._merge_dict(var_value, result, [var_name]) - else: # var_type == "list" - result += var_value + result = merger.merge(path=[var_name], left=result, right=var_value) return result - def _merge_dict(self, src, dest, path): - for key, value in src.items(): - if isinstance(value, dict): - node = dest.setdefault(key, {}) - self._merge_dict(value, node, path + [key]) - elif isinstance(value, list) and key in dest: - dest[key] += value - else: - if (key in dest) and dest[key] != value: - msg = f"The key '{key}' with value '{dest[key]}' will be overwritten with value '{value}' from '{'.'.join(path)}.{key}'" - if self._override == "error": - raise AnsibleError(msg) - if self._override == "warn": - display.warning(msg) +class MergeStrategy(ABC): + """ + Implements custom merge logic for combining two values. + """ - dest[key] = value + @abstractmethod + def merge(self, merger: ObjectMerger, path: list[str], left: t.Any, right: t.Any) -> t.Any: + raise NotImplementedError - return dest + +class MergeStrategies: + @classmethod + def from_name(cls, name: str, *args, **kwargs) -> MergeStrategy: + if name in cls.strategies: + return cls.strategies[name](*args, **kwargs) + else: + raise AnsibleError(f"Unknown merge strategy '{name}'") + + strategies: dict[str, Callable[..., t.Any]] = {} + + +class BaseMergeStrategies(MergeStrategies): + """ + Collection of base merge strategies. + """ + + class Replace(MergeStrategy): + """ + Overwrite older value with newer one. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: t.Any, right: t.Any) -> t.Any: + return right + + class Keep(MergeStrategy): + """ + Discard newer value. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: t.Any, right: t.Any) -> t.Any: + return left + + strategies = { + "replace": Replace, + "keep": Keep, + } + + +class DictMergeStrategies(MergeStrategies): + """ + Collection of dictionary merge strategies. + """ + + class Merge(MergeStrategy): + """ + Recursively merge dictionaries. + """ + + def merge( + self, merger: ObjectMerger, path: list[str], left: dict[str, t.Any], right: dict[str, t.Any] + ) -> dict[str, t.Any]: + result = left + for key, value in right.items(): + if key not in result: + result[key] = value + else: + path.append(key) + merged = merger.merge(path=path, left=result[key], right=value) + merger.before_override(path=path, old_value=result[key], new_value=merged) + result[key] = merged + + return result + + strategies = { + "merge": Merge, + "replace": BaseMergeStrategies.Replace, + "keep": BaseMergeStrategies.Keep, + } + + +class ListMergeStrategies(MergeStrategies): + """ + Collection of list merge strategies. + """ + + class Append(MergeStrategy): + """ + Append newer entries to the older ones. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]: + return left + right + + class Prepend(MergeStrategy): + """ + Insert newer entries in front of the older ones. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]: + return right + left + + class AppendRp(MergeStrategy): + """ + Append newer entries to the older ones, overwrite duplicates. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]: + return [item for item in left if item not in right] + right + + class PrependRp(MergeStrategy): + """ + Insert newer entries in front of the older ones, discard duplicates. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]: + return right + [item for item in left if item not in right] + + class Merge(MergeStrategy): + """ + Take the index as key and merge the entries. + """ + + def merge(self, merger: ObjectMerger, path: list[str], left: list[t.Any], right: list[t.Any]) -> list[t.Any]: + result = left + for i, value in enumerate(right): + if i >= len(result): + result.extend(right[i:]) + break + + path.append(str(i)) + merged = merger.merge(path=path, left=result[i], right=value) + merger.before_override(path=path, old_value=result[i], new_value=merged) + result[i] = merged + + return result + + strategies = { + "append": Append, + "prepend": Prepend, + "append_rp": AppendRp, + "prepend_rp": PrependRp, + "merge": Merge, + "replace": BaseMergeStrategies.Replace, + "keep": BaseMergeStrategies.Keep, + } + + +class Transformation(ABC): + """ + Implements custom transformation logic for converting a value to another representation. + """ + + @abstractmethod + def transform(self, merger: ObjectMerger, value: t.Any) -> t.Any: + raise NotImplementedError + + +class Transformations: + @classmethod + def from_name(cls, name: str, *args, **kwargs) -> Transformation: + if name in cls.transformations: + return cls.transformations[name](*args, **kwargs) + else: + raise AnsibleError(f"Unknown transformation '{name}'") + + transformations: dict[str, Callable[..., t.Any]] = {} + + +class ListTransformations(Transformations): + """ + Collection of list transformations. + """ + + class Dedup(Transformation): + """ + Removes duplicates from a list. + """ + + def transform(self, merger: ObjectMerger, value: list[t.Any]) -> list[t.Any]: + result = [] + for item in value: + if item not in result: + result.append(item) + return result + + class Flatten(Transformation): + """ + Takes a list and replaces any elements that are lists with a flattened sequence of the list contents. + If any of the nested lists also contain directly-nested lists, these are flattened recursively. + """ + + def transform(self, merger: ObjectMerger, value: list[t.Any]) -> list[t.Any]: + result = [] + for item in value: + if isinstance(item, list): + result.extend(self.transform(merger, item)) + else: + result.append(item) + return result + + transformations = { + "dedup": Dedup, + "flatten": Flatten, + } + + +class MergerBuilder: + """ + Helper that builds a merger based on provided strategies. + """ + + def __init__(self) -> None: + self.type_strategies: list[tuple[type, MergeStrategy]] = [] + self.type_conflict_strategy: MergeStrategy = BaseMergeStrategies.Replace() + self.default_strategy: MergeStrategy = BaseMergeStrategies.Replace() + self.transformations: list[tuple[type, Transformation]] = [] + self.override_behavior: str = "error" + self.shallow_merge: bool = False + + def with_shallow_merge(self, shallow_merge: bool = True) -> MergerBuilder: + self.shallow_merge = shallow_merge + return self + + def with_override_behavior(self, override_behavior: str) -> MergerBuilder: + self.override_behavior = override_behavior + return self + + def with_type_strategy(self, cls, merge_strategy: MergeStrategy) -> MergerBuilder: + self.type_strategies.append((cls, merge_strategy)) + return self + + def with_default_strategy(self, merge_strategy: MergeStrategy) -> MergerBuilder: + self.default_strategy = merge_strategy + return self + + def with_type_conflict_strategy(self, merge_strategy: MergeStrategy) -> MergerBuilder: + self.type_conflict_strategy = merge_strategy + return self + + def with_transformation(self, cls, *args: Transformation) -> MergerBuilder: + for transformation in args: + self.transformations.append((cls, transformation)) + return self + + def build(self) -> Merger: + merger = ObjectMerger( + type_strategies=self.type_strategies, + type_conflict_strategy=self.type_conflict_strategy, + default_strategy=self.default_strategy, + transformations=self.transformations, + override_behavior=self.override_behavior, + ) + + if self.shallow_merge: + return ShallowMerger(merger) + + return merger + + +class Merger(ABC): + @abstractmethod + def merge(self, path: list[str], left: t.Any, right: t.Any) -> t.Any: + raise NotImplementedError + + +class ObjectMerger(Merger): + """ + Merges objects based on provided strategies. + """ + + def __init__( + self, + type_strategies: list[tuple[type, MergeStrategy]], + type_conflict_strategy: MergeStrategy, + default_strategy: MergeStrategy, + transformations: list[tuple[type, Transformation]], + override_behavior: str, + ) -> None: + self.type_strategies = type_strategies + self.type_conflict_strategy = type_conflict_strategy + self.default_strategy = default_strategy + self.transformations = transformations + self.override_behavior = override_behavior + + def before_override(self, path: list[str], old_value: t.Any, new_value: t.Any) -> None: + if ( + not isinstance(old_value, type(new_value)) + and not isinstance(new_value, type(old_value)) + or not isinstance(new_value, dict) + and not isinstance(new_value, list) + and old_value != new_value + ): + # This behavior has been implemented for reasons of backward compatibility. + # An override is regarded as a value type conflict or + # when both values are neither dicts nor lists and + # have different values + msg = f"The key '{path[-1]}' with value '{old_value}' will be overwritten with value '{new_value}' from '{'.'.join(path)}'" + if self.override_behavior == "error": + raise AnsibleError(msg) + elif self.override_behavior == "warn": + display.warning(msg) + + def transform(self, value: t.Any) -> t.Any: + for cls, transformation in self.transformations: + if isinstance(value, cls): + value = transformation.transform(self, value) + return value + + def lookup_merge_strategy(self, left: t.Any, right: t.Any) -> MergeStrategy: + for cls, merge_strategy in self.type_strategies: + if isinstance(left, cls) and isinstance(right, cls): + return merge_strategy + + if not isinstance(left, type(right)) and not isinstance(right, type(left)): + return self.type_conflict_strategy + + return self.default_strategy + + def apply_merge_strategy(self, merge_strategy: MergeStrategy, path: list[str], left: t.Any, right: t.Any) -> t.Any: + return self.transform(merge_strategy.merge(self, path, left, right)) + + def merge(self, path: list[str], left: t.Any, right: t.Any) -> t.Any: + return self.apply_merge_strategy(self.lookup_merge_strategy(left, right), path, left, right) + + +class ShallowMerger(Merger): + """ + Wrapper around merger that combines the top-level values of two given dictionaries. + """ + + def __init__(self, merger: ObjectMerger) -> None: + self.merger = merger + + def merge(self, path: list[str], left: t.Any, right: t.Any) -> t.Any: + if isinstance(left, dict) and isinstance(right, dict): + return self.merger.apply_merge_strategy(DictMergeStrategies.Merge(), path, left, right) + + return self.merger.merge(path, left, right) diff --git a/tests/unit/plugins/lookup/test_merge_variables.py b/tests/unit/plugins/lookup/test_merge_variables.py index 94bc8c604b..ae6c14592d 100644 --- a/tests/unit/plugins/lookup/test_merge_variables.py +++ b/tests/unit/plugins/lookup/test_merge_variables.py @@ -5,6 +5,7 @@ from __future__ import annotations import unittest +from copy import deepcopy from unittest.mock import patch from ansible.errors import AnsibleError @@ -15,6 +16,14 @@ from ansible_collections.community.internal_test_tools.tests.unit.mock.loader im from ansible_collections.community.general.plugins.lookup import merge_variables +merge_hash_data = { + "low_prio": {"a": {"a": {"x": "low_value", "y": "low_value", "list": ["low_value"]}}, "b": [1, 1, 2, 3]}, + "high_prio": { + "a": {"a": {"y": "high_value", "z": "high_value", "list": ["high_value"]}}, + "b": [3, 4, 4, {"5": "value"}], + }, +} + class TestMergeVariablesLookup(unittest.TestCase): class HostVarsMock(dict): @@ -33,7 +42,11 @@ class TestMergeVariablesLookup(unittest.TestCase): self.merge_vars_lookup = merge_variables.LookupModule(loader=self.loader, templar=self.templar) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", None]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []], + ) @patch.object(Templar, "template", side_effect=[["item1"], ["item3"]]) def test_merge_list(self, mock_set_options, mock_get_option, mock_template): results = self.merge_vars_lookup.run( @@ -44,7 +57,11 @@ class TestMergeVariablesLookup(unittest.TestCase): self.assertEqual(results, [["item1", "item3"]]) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[["initial_item"], "ignore", "suffix", None]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[["initial_item"], "ignore", "suffix", None, "deep", "append", "replace", "replace", []], + ) @patch.object(Templar, "template", side_effect=[["item1"], ["item3"]]) def test_merge_list_with_initial_value(self, mock_set_options, mock_get_option, mock_template): results = self.merge_vars_lookup.run( @@ -55,7 +72,11 @@ class TestMergeVariablesLookup(unittest.TestCase): self.assertEqual(results, [["initial_item", "item1", "item3"]]) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", None]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []], + ) @patch.object( Templar, "template", @@ -76,7 +97,17 @@ class TestMergeVariablesLookup(unittest.TestCase): @patch.object( AnsiblePlugin, "get_option", - side_effect=[{"initial_item": "random value", "list_item": ["test0"]}, "ignore", "suffix", None], + side_effect=[ + {"initial_item": "random value", "list_item": ["test0"]}, + "ignore", + "suffix", + None, + "deep", + "append", + "replace", + "replace", + [], + ], ) @patch.object( Templar, @@ -105,7 +136,11 @@ class TestMergeVariablesLookup(unittest.TestCase): ) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "warn", "suffix", None]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "warn", "suffix", None, "deep", "append", "replace", "replace", []], + ) @patch.object(Templar, "template", side_effect=[{"item": "value1"}, {"item": "value2"}]) @patch.object(Display, "warning") def test_merge_dict_non_unique_warning(self, mock_set_options, mock_get_option, mock_template, mock_display): @@ -118,7 +153,11 @@ class TestMergeVariablesLookup(unittest.TestCase): self.assertEqual(results, [{"item": "value2"}]) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "error", "suffix", None]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "error", "suffix", None, "deep", "append", "replace", "replace", []], + ) @patch.object(Templar, "template", side_effect=[{"item": "value1"}, {"item": "value2"}]) def test_merge_dict_non_unique_error(self, mock_set_options, mock_get_option, mock_template): with self.assertRaises(AnsibleError): @@ -128,7 +167,11 @@ class TestMergeVariablesLookup(unittest.TestCase): ) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", None]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []], + ) @patch.object(Templar, "template", side_effect=[{"item1": "test", "list_item": ["test1"]}, ["item2", "item3"]]) def test_merge_list_and_dict(self, mock_set_options, mock_get_option, mock_template): with self.assertRaises(AnsibleError): @@ -141,7 +184,11 @@ class TestMergeVariablesLookup(unittest.TestCase): ) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["all"]]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", ["all"], "deep", "append", "replace", "replace", []], + ) @patch.object( Templar, "template", @@ -174,7 +221,11 @@ class TestMergeVariablesLookup(unittest.TestCase): ) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["dummy1"]]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", ["dummy1"], "deep", "append", "replace", "replace", []], + ) @patch.object( Templar, "template", @@ -212,7 +263,11 @@ class TestMergeVariablesLookup(unittest.TestCase): ) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"]]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"], "deep", "append", "replace", "replace", []], + ) @patch.object( Templar, "template", @@ -249,7 +304,11 @@ class TestMergeVariablesLookup(unittest.TestCase): ) @patch.object(AnsiblePlugin, "set_options") - @patch.object(AnsiblePlugin, "get_option", side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"]]) + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", ["dummy1", "dummy2"], "deep", "append", "replace", "replace", []], + ) @patch.object( Templar, "template", @@ -270,3 +329,436 @@ class TestMergeVariablesLookup(unittest.TestCase): results = self.merge_vars_lookup.run(["__merge_var"], variables) self.assertEqual(results, [["item1", "item5"]]) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "shallow", "replace", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_shallow_and_list_replace(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_shallow_and_list_replace"], + { + "testdict1__merge_dict_shallow_and_list_replace": merge_hash_data["low_prio"], + "testdict2__merge_dict_shallow_and_list_replace": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual(results, [merge_hash_data["high_prio"]]) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "shallow", "keep", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_shallow_and_list_keep(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_shallow_and_list_keep"], + { + "testdict1__merge_dict_shallow_and_list_keep": merge_hash_data["low_prio"], + "testdict2__merge_dict_shallow_and_list_keep": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual(results, [{"a": merge_hash_data["high_prio"]["a"], "b": merge_hash_data["low_prio"]["b"]}]) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "shallow", "append", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_shallow_and_list_append(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_shallow_and_list_append"], + { + "testdict1__merge_dict_shallow_and_list_append": merge_hash_data["low_prio"], + "testdict2__merge_dict_shallow_and_list_append": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": merge_hash_data["high_prio"]["a"], + "b": merge_hash_data["low_prio"]["b"] + merge_hash_data["high_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "shallow", "prepend", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_shallow_and_list_prepend(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_shallow_and_list_prepend"], + { + "testdict1__merge_dict_shallow_and_list_prepend": merge_hash_data["low_prio"], + "testdict2__merge_dict_shallow_and_list_prepend": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": merge_hash_data["high_prio"]["a"], + "b": merge_hash_data["high_prio"]["b"] + merge_hash_data["low_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "shallow", "append_rp", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_shallow_and_list_append_rp(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_shallow_and_list_append_rp"], + { + "testdict1__merge_dict_shallow_and_list_append_rp": merge_hash_data["low_prio"], + "testdict2__merge_dict_shallow_and_list_append_rp": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, [{"a": merge_hash_data["high_prio"]["a"], "b": [1, 1, 2] + merge_hash_data["high_prio"]["b"]}] + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "shallow", "prepend_rp", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_shallow_and_list_prepend_rp(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_shallow_and_list_prepend_rp"], + { + "testdict1__merge_dict_shallow_and_list_prepend_rp": merge_hash_data["low_prio"], + "testdict2__merge_dict_shallow_and_list_prepend_rp": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, [{"a": merge_hash_data["high_prio"]["a"], "b": merge_hash_data["high_prio"]["b"] + [1, 1, 2]}] + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "replace", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_replace(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_replace"], + { + "testdict1__merge_dict_deep_and_list_replace": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_replace": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": {"a": {"x": "low_value", "y": "high_value", "z": "high_value", "list": ["high_value"]}}, + "b": merge_hash_data["high_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "keep", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_keep(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_keep"], + { + "testdict1__merge_dict_deep_and_list_keep": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_keep": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": {"a": {"x": "low_value", "y": "high_value", "z": "high_value", "list": ["low_value"]}}, + "b": merge_hash_data["low_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_append(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_append"], + { + "testdict1__merge_dict_deep_and_list_append": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_append": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": { + "a": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["low_value", "high_value"], + } + }, + "b": merge_hash_data["low_prio"]["b"] + merge_hash_data["high_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "prepend", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_prepend(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_prepend"], + { + "testdict1__merge_dict_deep_and_list_prepend": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_prepend": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": { + "a": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["high_value", "low_value"], + } + }, + "b": merge_hash_data["high_prio"]["b"] + merge_hash_data["low_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "append_rp", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_append_rp(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_append_rp"], + { + "testdict1__merge_dict_deep_and_list_append_rp": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_append_rp": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": { + "a": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["low_value", "high_value"], + } + }, + "b": [1, 1, 2] + merge_hash_data["high_prio"]["b"], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "prepend_rp", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_prepend_rp(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_prepend_rp"], + { + "testdict1__merge_dict_deep_and_list_prepend_rp": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_prepend_rp": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": { + "a": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["high_value", "low_value"], + } + }, + "b": merge_hash_data["high_prio"]["b"] + [1, 1, 2], + } + ], + ) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "replace", "append", "keep", "keep", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_replace(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_replace"], + { + "testdict1__merge_dict_replace": merge_hash_data["low_prio"], + "testdict2__merge_dict_replace": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual(results, [merge_hash_data["high_prio"]]) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "keep", "append", "replace", "replace", []], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_keep(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_keep"], + { + "testdict1__merge_dict_keep": merge_hash_data["low_prio"], + "testdict2__merge_dict_keep": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual(results, [merge_hash_data["low_prio"]]) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "merge", "replace", "replace", []], + ) + @patch.object( + Templar, + "template", + side_effect=[[{"a": "b", "c": "d"}, {"c": "d"}], [{"a": "x", "y": "z"}, {"c": "z"}, {"a": "y"}]], + ) + def test_merge_list_deep(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_list_deep"], + { + "testdict1__merge_list_deep": [{"a": "b", "c": "d"}, {"c": "d"}], + "testdict2__merge_list_deep": [{"a": "x", "y": "z"}, {"c": "z"}, {"a": "y"}], + }, + ) + + self.assertEqual(results, [[{"a": "x", "c": "d", "y": "z"}, {"c": "z"}, {"a": "y"}]]) + + @patch.object(AnsiblePlugin, "set_options") + @patch.object( + AnsiblePlugin, + "get_option", + side_effect=[None, "ignore", "suffix", None, "deep", "append", "replace", "replace", [{"name": "dedup"}]], + ) + @patch.object( + Templar, "template", side_effect=[deepcopy(merge_hash_data["low_prio"]), deepcopy(merge_hash_data["high_prio"])] + ) + def test_merge_dict_deep_and_list_dedup(self, mock_set_options, mock_get_option, mock_template): + results = self.merge_vars_lookup.run( + ["__merge_dict_deep_and_list_dedup"], + { + "testdict1__merge_dict_deep_and_list_dedup": merge_hash_data["low_prio"], + "testdict2__merge_dict_deep_and_list_dedup": merge_hash_data["high_prio"], + }, + ) + + self.assertEqual( + results, + [ + { + "a": { + "a": { + "x": "low_value", + "y": "high_value", + "z": "high_value", + "list": ["low_value", "high_value"], + } + }, + "b": [1, 2, 3, 4, {"5": "value"}], + } + ], + )