From c2485ea57bbff5c1fafb9d86e473bef5f7c10de7 Mon Sep 17 00:00:00 2001 From: bne1hm <67783534+bne1hm@users.noreply.github.com> Date: Sun, 17 May 2026 07:48:08 +0000 Subject: [PATCH] apt_rpm: fix upgrade of local RPM not present in repository (#9161) (#12039) --- .../9161-apt-rpm-local-rpm-upgrade.yml | 2 + plugins/modules/apt_rpm.py | 41 +++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) create mode 100644 changelogs/fragments/9161-apt-rpm-local-rpm-upgrade.yml diff --git a/changelogs/fragments/9161-apt-rpm-local-rpm-upgrade.yml b/changelogs/fragments/9161-apt-rpm-local-rpm-upgrade.yml new file mode 100644 index 0000000000..dfad7371e4 --- /dev/null +++ b/changelogs/fragments/9161-apt-rpm-local-rpm-upgrade.yml @@ -0,0 +1,2 @@ +bugfixes: + - "apt_rpm - fix upgrade of local RPM not present in repository (https://github.com/ansible-collections/community.general/issues/9161, https://github.com/ansible-collections/community.general/pull/12039)." diff --git a/plugins/modules/apt_rpm.py b/plugins/modules/apt_rpm.py index 0674793d14..42ece0eaa1 100644 --- a/plugins/modules/apt_rpm.py +++ b/plugins/modules/apt_rpm.py @@ -151,17 +151,30 @@ def local_rpm_package_name(path): """return package name of a local rpm passed in. Inspired by ansible.builtin.yum""" + header = get_local_rpm_header(path) + if header is None: + return None + + return to_native(header[rpm.RPMTAG_NAME]) + + +def get_local_rpm_header(path): + """return rpm header of a local rpm file.""" ts = rpm.TransactionSet() ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES) fd = os.open(path, os.O_RDONLY) try: - header = ts.hdrFromFdno(fd) + return ts.hdrFromFdno(fd) except rpm.error: return None finally: os.close(fd) - return to_native(header[rpm.RPMTAG_NAME]) + +def get_installed_rpm_header(name): + """return rpm header of an installed package.""" + ts = rpm.TransactionSet() + return next(ts.dbMatch(rpm.RPMTAG_NAME, name), None) def query_package(module, name): @@ -171,20 +184,31 @@ def query_package(module, name): return rc == 0 -def check_package_version(module, name): +def check_package_version(module, name, local_rpm_path): # compare installed and candidate version # if newest version already installed return True # otherwise return False - rc, out, err = module.run_command([APT_CACHE, "policy", name], environ_update={"LANGUAGE": "C", "LC_ALL": "C"}) - installed = re.split("\n |: ", out)[2] - candidate = re.split("\n |: ", out)[4] - return installed >= candidate + if local_rpm_path is not None: + local_hdr = get_local_rpm_header(local_rpm_path) + if local_hdr is None: + module.fail_json(msg=f"Failed to read version from local RPM file: {local_rpm_path}") + inst_hdr = get_installed_rpm_header(name) + if inst_hdr is None: + return False + result = rpm.versionCompare(inst_hdr, local_hdr) + return result >= 0 + else: + rc, out, err = module.run_command([APT_CACHE, "policy", name], environ_update={"LANGUAGE": "C", "LC_ALL": "C"}) + installed = re.split("\n |: ", out)[2] + candidate = re.split("\n |: ", out)[4] + return installed >= candidate def query_package_provides(module, name, allow_upgrade=False): # rpm -q returns 0 if the package is installed, # 1 if it is not installed + local_rpm_path = None if name.endswith(".rpm"): # Likely a local RPM file if not HAS_RPM_PYTHON: @@ -193,13 +217,14 @@ def query_package_provides(module, name, allow_upgrade=False): exception=RPM_PYTHON_IMPORT_ERROR, ) + local_rpm_path = name name = local_rpm_package_name(name) rc, out, err = module.run_command([RPM_PATH, "-q", "--provides", name]) if rc == 0: if not allow_upgrade: return True - if check_package_version(module, name): + if check_package_version(module, name, local_rpm_path): return True return False