1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-06-11 02:25:36 +00:00
community.general/plugins/module_utils/_xml.py
2026-06-08 16:50:31 +05:30

142 lines
5.3 KiB
Python

# Copyright (c) 2014, Red Hat, Inc.
# Copyright (c) 2014, Tim Bielawa <tbielawa@redhat.com>
# Copyright (c) 2014, Magnus Hedemark <mhedemar@redhat.com>
# Copyright (c) 2017, Dag Wieers <dag@wieers.com>
# Copyright (c) 2026, Shreyash Bhosale <shreyashpb16@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
# Note that this module util is **PRIVATE** to the collection. It can have breaking changes at any time.
# Do not use this from other collections or standalone plugins/modules!
from __future__ import annotations
import json
import os
import typing as t
from io import BytesIO
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.general.plugins.module_utils import _deps as deps
from ansible_collections.community.general.plugins.module_utils._version import LooseVersion
if t.TYPE_CHECKING:
from ansible.module_utils.basic import AnsibleModule
etree: t.Any = None
with deps.declare("lxml"):
from lxml import etree # type: ignore[no-redef]
COMMON_ARGUMENT_SPEC = dict(
path=dict(type="path", aliases=["dest", "file"]),
xmlstring=dict(type="str"),
xpath=dict(type="str"),
namespaces=dict(type="dict", default={}),
count=dict(type="bool", default=False),
print_match=dict(type="bool", default=False),
content=dict(type="str", choices=["attribute", "text"]),
strip_cdata_tags=dict(type="bool", default=False),
huge_tree=dict(type="bool", default=False),
)
def check_lxml(module: AnsibleModule) -> None:
deps.validate(module, "lxml")
version_str = ".".join(str(f) for f in etree.LXML_VERSION)
if LooseVersion(version_str) < LooseVersion("2.3.0"):
module.fail_json(msg="The xml ansible module requires lxml 2.3.0 or newer installed on the managed machine")
elif LooseVersion(version_str) < LooseVersion("3.0.0"):
module.warn("Using lxml version lower than 3.0.0 does not guarantee predictable element attribute order.")
def validate_xpath(module: AnsibleModule, xpath: str) -> None:
try:
etree.XPath(xpath)
except etree.XPathSyntaxError as e:
module.fail_json(msg=f"Syntax error in xpath expression: {xpath} ({e})")
except etree.XPathEvalError as e:
module.fail_json(msg=f"Evaluation error in xpath expression: {xpath} ({e})")
def parse_xml_doc(
module: AnsibleModule,
xml_file: str | None = None,
xml_string: str | None = None,
strip_cdata_tags: bool = False,
huge_tree: bool = False,
remove_blank_text: bool = False,
resolve_entities: bool = True,
) -> t.Any:
infile: t.IO[bytes] | None = None
try:
if xml_string:
infile = BytesIO(to_bytes(xml_string, errors="surrogate_or_strict"))
elif xml_file and os.path.isfile(xml_file):
infile = open(xml_file, "rb") # noqa: SIM115
else:
module.fail_json(msg=f"The target XML source '{xml_file}' does not exist.")
try:
parser = etree.XMLParser(
remove_blank_text=remove_blank_text,
strip_cdata=strip_cdata_tags,
huge_tree=huge_tree,
resolve_entities=resolve_entities,
)
doc = etree.parse(infile, parser)
except etree.XMLSyntaxError as e:
module.fail_json(msg=f"Error while parsing document: {xml_file or 'xml_string'} ({e})")
finally:
if infile:
infile.close()
return doc
def xpath_matches(tree: t.Any, xpath: str, namespaces: dict) -> bool:
"""Test if a node exists."""
return bool(tree.xpath(xpath, namespaces=namespaces))
def is_node(tree: t.Any, xpath: str, namespaces: dict) -> bool:
"""Test if a given xpath matches anything and if that match is a node."""
if xpath_matches(tree, xpath, namespaces):
match = tree.xpath(xpath, namespaces=namespaces)
if isinstance(match[0], etree._Element):
return True
return False
def get_matches(tree: t.Any, xpath: str, namespaces: dict) -> dict:
"""Return matched XPath paths."""
match = tree.xpath(xpath, namespaces=namespaces)
match_xpaths = [tree.getpath(m) for m in match]
match_str = json.dumps(match_xpaths)
msg = f"selector '{xpath}' match: {match_str}"
return {"msg": msg, "matches": match_xpaths}
def count_matches(tree: t.Any, xpath: str, namespaces: dict) -> dict:
"""Return the count of nodes matching the xpath."""
hits = len(tree.xpath(xpath, namespaces=namespaces))
msg = f"found {hits} nodes"
return {"msg": msg, "count": hits}
def collect_element_text(tree: t.Any, xpath: str, namespaces: dict) -> list | None:
"""Get text content of matched elements. Returns None if xpath does not match a node."""
if not is_node(tree, xpath, namespaces):
return None
return [{element.tag: element.text} for element in tree.xpath(xpath, namespaces=namespaces)]
def collect_element_attr(tree: t.Any, xpath: str, namespaces: dict) -> list | None:
"""Get attributes of matched elements. Returns None if xpath does not match a node."""
if not is_node(tree, xpath, namespaces):
return None
elements = []
for element in tree.xpath(xpath, namespaces=namespaces):
elements.append({element.tag: dict(element.attrib)})
return elements