1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00

modules [jk]*: use f-strings (#10970)

* modules [jk]*: use f-strings

* add changelog frag

* Apply suggestions from code review

* typing insanity
This commit is contained in:
Alexei Znamensky 2025-10-26 19:54:15 +13:00 committed by GitHub
parent 8120e9347e
commit 4a6a449fbd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 363 additions and 408 deletions

View file

@ -386,7 +386,7 @@ class JenkinsPlugin(object):
def _csrf_enabled(self):
csrf_data = self._get_json_data(
"%s/%s" % (self.url, "api/json"), 'CSRF')
f"{self.url}/api/json", 'CSRF')
if 'useCrumbs' not in csrf_data:
self.module.fail_json(
@ -404,7 +404,7 @@ class JenkinsPlugin(object):
json_data = json.loads(to_native(r.read()))
except Exception as e:
self.module.fail_json(
msg="Cannot parse %s JSON data." % what,
msg=f"Cannot parse {what} JSON data.",
details=to_native(e))
return json_data
@ -412,16 +412,16 @@ class JenkinsPlugin(object):
def _get_urls_data(self, urls, what=None, msg_status=None, msg_exception=None, **kwargs):
# Compose default messages
if msg_status is None:
msg_status = "Cannot get %s" % what
msg_status = f"Cannot get {what}"
if msg_exception is None:
msg_exception = "Retrieval of %s failed." % what
msg_exception = f"Retrieval of {what} failed."
errors = {}
for url in urls:
err_msg = None
try:
self.module.debug("fetching url: %s" % url)
self.module.debug(f"fetching url: {url}")
is_jenkins_call = url.startswith(self.url)
self.module.params['force_basic_auth'] = is_jenkins_call
@ -433,11 +433,11 @@ class JenkinsPlugin(object):
if info['status'] == 200:
return response
else:
err_msg = ("%s. fetching url %s failed. response code: %s" % (msg_status, url, info['status']))
err_msg = f"{msg_status}. fetching url {url} failed. response code: {info['status']}"
if info['status'] > 400: # extend error message
err_msg = "%s. response body: %s" % (err_msg, info['body'])
err_msg = f"{err_msg}. response body: {info['body']}"
except Exception as e:
err_msg = "%s. fetching url %s failed. error msg: %s" % (msg_status, url, to_native(e))
err_msg = f"{msg_status}. fetching url {url} failed. error msg: {e}"
finally:
if err_msg is not None:
self.module.debug(err_msg)
@ -451,10 +451,10 @@ class JenkinsPlugin(object):
dont_fail=False, **kwargs):
# Compose default messages
if msg_status is None:
msg_status = "Cannot get %s" % what
msg_status = f"Cannot get {what}"
if msg_exception is None:
msg_exception = "Retrieval of %s failed." % what
msg_exception = f"Retrieval of {what} failed."
# Get the URL data
try:
@ -481,7 +481,7 @@ class JenkinsPlugin(object):
def _get_crumb(self):
crumb_data = self._get_json_data(
"%s/%s" % (self.url, "crumbIssuer/api/json"), 'Crumb')
f"{self.url}/crumbIssuer/api/json", 'Crumb')
if 'crumbRequestField' in crumb_data and 'crumb' in crumb_data:
self.crumb[crumb_data['crumbRequestField']] = crumb_data['crumb']
@ -492,7 +492,7 @@ class JenkinsPlugin(object):
def _get_installed_plugins(self):
plugins_data = self._get_json_data(
"%s/%s" % (self.url, "pluginManager/api/json?depth=1"),
f"{self.url}/pluginManager/api/json?depth=1",
'list of plugins')
# Check if we got valid data
@ -555,8 +555,7 @@ class JenkinsPlugin(object):
if not self.module.check_mode:
# Install the plugin (with dependencies)
install_script = (
'd = Jenkins.instance.updateCenter.getPlugin("%s")'
'.deploy(); d.get();' % self.params['name'])
f"""d = Jenkins.instance.updateCenter.getPlugin("{self.params['name']}").deploy(); d.get();""")
if self.params['with_dependencies']:
install_script = (
@ -571,15 +570,13 @@ class JenkinsPlugin(object):
# Send the installation request
r = self._get_url_data(
"%s/scriptText" % self.url,
f"{self.url}/scriptText",
msg_status="Cannot install plugin.",
msg_exception="Plugin installation has failed.",
data=data,
dont_fail=True)
hpi_file = '%s/plugins/%s.hpi' % (
self.params['jenkins_home'],
self.params['name'])
hpi_file = f"{self.params['jenkins_home']}/plugins/{self.params['name']}.hpi"
if os.path.isfile(hpi_file):
os.remove(hpi_file)
@ -587,9 +584,7 @@ class JenkinsPlugin(object):
def install(self):
changed = False
plugin_file = (
'%s/plugins/%s.jpi' % (
self.params['jenkins_home'],
self.params['name']))
f"{self.params['jenkins_home']}/plugins/{self.params['name']}.jpi")
if not self.is_installed and self.params['version'] in [None, 'latest']:
try:
@ -684,7 +679,7 @@ class JenkinsPlugin(object):
urls = []
for base_url in self.params['updates_url']:
for update_segment in self.params['latest_plugins_url_segments']:
urls.append("{0}/{1}/{2}.hpi".format(base_url, update_segment, self.params['name']))
urls.append(f"{base_url}/{update_segment}/{self.params['name']}.hpi")
return urls
def _get_latest_compatible_plugin_version(self, plugin_name=None):
@ -694,11 +689,11 @@ class JenkinsPlugin(object):
raw_version = info.get("x-jenkins")
self.jenkins_version = self.parse_version(raw_version)
name = plugin_name or self.params['name']
cache_path = "{}/ansible_jenkins_plugin_cache.json".format(self.params['jenkins_home'])
cache_path = f"{self.params['jenkins_home']}/ansible_jenkins_plugin_cache.json"
plugin_version_urls = []
for base_url in self.params['updates_url']:
for update_json in self.params['plugin_versions_url_segment']:
plugin_version_urls.append("{}/{}".format(base_url, update_json))
plugin_version_urls.append(f"{base_url}/{update_json}")
try: # Check if file is saved localy
if os.path.exists(cache_path):
@ -725,7 +720,7 @@ class JenkinsPlugin(object):
plugin_versions = plugin_data.get("plugins", {}).get(name)
if not plugin_versions:
self.module.fail_json(msg="Plugin '{}' not found.".format(name))
self.module.fail_json(msg=f"Plugin '{name}' not found.")
sorted_versions = list(reversed(plugin_versions.items()))
@ -735,22 +730,21 @@ class JenkinsPlugin(object):
return 'latest' if idx == 0 else version_title
self.module.warn(
"No compatible version found for plugin '{}'. "
"Installing latest version.".format(name))
f"No compatible version found for plugin '{name}'. Installing latest version.")
return 'latest'
def _get_versioned_plugin_urls(self):
urls = []
for base_url in self.params['updates_url']:
for versioned_segment in self.params['versioned_plugins_url_segments']:
urls.append("{0}/{1}/{2}/{3}/{2}.hpi".format(base_url, versioned_segment, self.params['name'], self.params['version']))
urls.append(f"{base_url}/{versioned_segment}/{self.params['name']}/{self.params['version']}/{self.params['name']}.hpi")
return urls
def _get_update_center_urls(self):
urls = []
for base_url in self.params['updates_url']:
for update_json in self.params['update_json_url_segment']:
urls.append("{0}/{1}".format(base_url, update_json))
urls.append(f"{base_url}/{update_json}")
return urls
def _get_versioned_dependencies(self):
@ -791,7 +785,7 @@ class JenkinsPlugin(object):
os.close(tmp_update_fd)
except IOError as e:
self.module.fail_json(
msg="Cannot close the tmp updates file %s." % tmp_updates_file,
msg=f"Cannot close the tmp updates file {tmp_updates_file}.",
details=to_native(e))
else:
tmp_updates_file = updates_file
@ -805,11 +799,11 @@ class JenkinsPlugin(object):
data = json.loads(f.readline())
except IOError as e:
self.module.fail_json(
msg="Cannot open%s updates file." % (" temporary" if tmp_updates_file != updates_file else ""),
msg=f"Cannot open{' temporary' if tmp_updates_file != updates_file else ''} updates file.",
details=to_native(e))
except Exception as e:
self.module.fail_json(
msg="Cannot load JSON data from the%s updates file." % (" temporary" if tmp_updates_file != updates_file else ""),
msg=f"Cannot load JSON data from the{' temporary' if tmp_updates_file != updates_file else ''} updates file.",
details=to_native(e))
# Move the updates file to the right place if we could read it
@ -843,7 +837,7 @@ class JenkinsPlugin(object):
os.close(tmp_f_fd)
except IOError as e:
self.module.fail_json(
msg='Cannot close the temporal plugin file %s.' % tmp_f,
msg=f'Cannot close the temporal plugin file {tmp_f}.',
details=to_native(e))
# Move the file onto the right place
@ -877,7 +871,7 @@ class JenkinsPlugin(object):
# Perform the action
if not self.module.check_mode:
self._pm_query(action, "%sning" % action.capitalize())
self._pm_query(action, f"{action.capitalize()}ning")
changed = True
@ -900,22 +894,21 @@ class JenkinsPlugin(object):
# Perform the action
if not self.module.check_mode:
self._pm_query(
"make%sd" % action.capitalize(),
"%sing" % action[:-1].capitalize())
f"make{action.capitalize()}d",
f"{action[:-1].capitalize()}ing")
changed = True
return changed
def _pm_query(self, action, msg):
url = "%s/pluginManager/plugin/%s/%s" % (
self.params['url'], self.params['name'], action)
url = f"{self.params['url']}/pluginManager/plugin/{self.params['name']}/{action}"
# Send the request
self._get_url_data(
url,
msg_status="Plugin not found. %s" % url,
msg_exception="%s has failed." % msg,
msg_status=f"Plugin not found. {url}",
msg_exception=f"{msg} has failed.",
method="POST")
@staticmethod
@ -971,7 +964,7 @@ def main():
module.params['timeout'] = float(module.params['timeout'])
except ValueError as e:
module.fail_json(
msg='Cannot convert %s to float.' % module.params['timeout'],
msg=f"Cannot convert {module.params['timeout']} to float.",
details=to_native(e))
# Instantiate the JenkinsPlugin object
jp = JenkinsPlugin(module)