mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-06-05 15:57:05 +00:00
This commit is contained in:
parent
c1c389e684
commit
c2485ea57b
2 changed files with 35 additions and 8 deletions
|
|
@ -151,17 +151,30 @@ def local_rpm_package_name(path):
|
|||
"""return package name of a local rpm passed in.
|
||||
Inspired by ansible.builtin.yum"""
|
||||
|
||||
header = get_local_rpm_header(path)
|
||||
if header is None:
|
||||
return None
|
||||
|
||||
return to_native(header[rpm.RPMTAG_NAME])
|
||||
|
||||
|
||||
def get_local_rpm_header(path):
|
||||
"""return rpm header of a local rpm file."""
|
||||
ts = rpm.TransactionSet()
|
||||
ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
|
||||
fd = os.open(path, os.O_RDONLY)
|
||||
try:
|
||||
header = ts.hdrFromFdno(fd)
|
||||
return ts.hdrFromFdno(fd)
|
||||
except rpm.error:
|
||||
return None
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
return to_native(header[rpm.RPMTAG_NAME])
|
||||
|
||||
def get_installed_rpm_header(name):
|
||||
"""return rpm header of an installed package."""
|
||||
ts = rpm.TransactionSet()
|
||||
return next(ts.dbMatch(rpm.RPMTAG_NAME, name), None)
|
||||
|
||||
|
||||
def query_package(module, name):
|
||||
|
|
@ -171,20 +184,31 @@ def query_package(module, name):
|
|||
return rc == 0
|
||||
|
||||
|
||||
def check_package_version(module, name):
|
||||
def check_package_version(module, name, local_rpm_path):
|
||||
# compare installed and candidate version
|
||||
# if newest version already installed return True
|
||||
# otherwise return False
|
||||
|
||||
rc, out, err = module.run_command([APT_CACHE, "policy", name], environ_update={"LANGUAGE": "C", "LC_ALL": "C"})
|
||||
installed = re.split("\n |: ", out)[2]
|
||||
candidate = re.split("\n |: ", out)[4]
|
||||
return installed >= candidate
|
||||
if local_rpm_path is not None:
|
||||
local_hdr = get_local_rpm_header(local_rpm_path)
|
||||
if local_hdr is None:
|
||||
module.fail_json(msg=f"Failed to read version from local RPM file: {local_rpm_path}")
|
||||
inst_hdr = get_installed_rpm_header(name)
|
||||
if inst_hdr is None:
|
||||
return False
|
||||
result = rpm.versionCompare(inst_hdr, local_hdr)
|
||||
return result >= 0
|
||||
else:
|
||||
rc, out, err = module.run_command([APT_CACHE, "policy", name], environ_update={"LANGUAGE": "C", "LC_ALL": "C"})
|
||||
installed = re.split("\n |: ", out)[2]
|
||||
candidate = re.split("\n |: ", out)[4]
|
||||
return installed >= candidate
|
||||
|
||||
|
||||
def query_package_provides(module, name, allow_upgrade=False):
|
||||
# rpm -q returns 0 if the package is installed,
|
||||
# 1 if it is not installed
|
||||
local_rpm_path = None
|
||||
if name.endswith(".rpm"):
|
||||
# Likely a local RPM file
|
||||
if not HAS_RPM_PYTHON:
|
||||
|
|
@ -193,13 +217,14 @@ def query_package_provides(module, name, allow_upgrade=False):
|
|||
exception=RPM_PYTHON_IMPORT_ERROR,
|
||||
)
|
||||
|
||||
local_rpm_path = name
|
||||
name = local_rpm_package_name(name)
|
||||
|
||||
rc, out, err = module.run_command([RPM_PATH, "-q", "--provides", name])
|
||||
if rc == 0:
|
||||
if not allow_upgrade:
|
||||
return True
|
||||
if check_package_version(module, name):
|
||||
if check_package_version(module, name, local_rpm_path):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue