1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00

modules ip*: use f-strings (#10968)

* modules ip*: use f-strings

* add changelog frag
This commit is contained in:
Alexei Znamensky 2025-10-25 13:54:37 +13:00 committed by GitHub
parent 0ef2235929
commit 0b6e99b28b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 80 additions and 73 deletions

View file

@ -0,0 +1,13 @@
minor_changes:
- ipa_getkeytab - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipa_host - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipa_otptoken - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipa_subca - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipa_user - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipbase_info - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipify_facts - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipinfoio_facts - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipmi_boot - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipmi_power - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- iptables_state - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).
- ipwcli_dns - use f-strings for string templating (https://github.com/ansible-collections/community.general/pull/10968).

View file

@ -218,7 +218,7 @@ def main():
try:
os.remove(path)
except OSError as e:
module.fail_json(msg="Error deleting: %s - %s." % (e.filename, e.strerror))
module.fail_json(msg=f"Error deleting: {e.filename} - {e.strerror}.")
keytab._exec()
changed = True
if force and module.check_mode:
@ -234,7 +234,7 @@ def main():
try:
os.remove(path)
except OSError as e:
module.fail_json(msg="Error deleting: %s - %s." % (e.filename, e.strerror))
module.fail_json(msg=f"Error deleting: {e.filename} - {e.strerror}.")
module.exit_json(changed=changed)

View file

@ -279,7 +279,7 @@ def ensure(module, client):
return changed, client.host_add(name=name, host=module_host)
else:
if state in ['disabled', 'enabled']:
module.fail_json(msg="No host with name " + ipa_host + " found")
module.fail_json(msg=f"No host with name {ipa_host} found")
diff = get_host_diff(client, ipa_host, module_host)
if len(diff) > 0:

View file

@ -237,9 +237,9 @@ def get_otptoken_dict(ansible_to_ipa, uniqueid=None, newuniqueid=None, otptype=N
if enabled is not None:
otptoken[ansible_to_ipa['enabled']] = False if enabled else True
if notbefore is not None:
otptoken[ansible_to_ipa['notbefore']] = notbefore + 'Z'
otptoken[ansible_to_ipa['notbefore']] = f"{notbefore}Z"
if notafter is not None:
otptoken[ansible_to_ipa['notafter']] = notafter + 'Z'
otptoken[ansible_to_ipa['notafter']] = f"{notafter}Z"
if vendor is not None:
otptoken[ansible_to_ipa['vendor']] = vendor
if model is not None:
@ -333,8 +333,7 @@ def validate_modifications(ansible_to_ipa, module, ipa_otptoken,
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]]
else:
if len(ipa_otptoken[ansible_to_ipa[parameter]]) != 1:
module.fail_json(msg=("Invariant fail: Return value from IPA is not a list " +
"of length 1. Please open a bug report for the module."))
module.fail_json(msg="Invariant fail: Return value from IPA is not a list of length 1. Please open a bug report for the module.")
if parameter == 'secretkey':
# We stored the secret key in base32 since we had assumed that would need to
# be the format if we were contacting IPA to create it. However, we are
@ -358,11 +357,9 @@ def validate_modifications(ansible_to_ipa, module, ipa_otptoken,
if mod_value != ipa_value:
modifications_valid = False
fail_message = ("Parameter '" + parameter + "' cannot be changed once " +
"the OTP is created and the requested value specified here (" +
str(mod_value) +
") differs from what is set in the IPA server ("
+ str(ipa_value) + ")")
fail_message = (f"Parameter '{parameter}' cannot be changed once " +
f"the OTP is created and the requested value specified here ({mod_value}) " +
f"differs from what is set in the IPA server ({ipa_value})")
module.fail_json(msg=fail_message)
return modifications_valid
@ -422,9 +419,8 @@ def ensure(module, client):
# Check to see if the new unique id is already taken in use
ipa_otptoken_new = client.otptoken_find(name=module_otptoken[ansible_to_ipa['newuniqueid']])
if ipa_otptoken_new:
module.fail_json(msg=("Requested rename through newuniqueid to " +
module_otptoken[ansible_to_ipa['newuniqueid']] +
" failed because the new unique id is already in use"))
module.fail_json(msg=(f"Requested rename through newuniqueid to {module_otptoken[ansible_to_ipa['newuniqueid']]} "
"failed because the new unique id is already in use"))
changed = False
if state == 'present':

View file

@ -104,7 +104,7 @@ class SubCAIPAClient(IPAClient):
for change in diff:
update_detail = dict()
if item[change] is not None:
update_detail.update(setattr="{0}={1}".format(change, item[change]))
update_detail.update(setattr=f"{change}={item[change]}")
self._post_json(method='ca_mod', name=subca_name, item=update_detail)
def subca_del(self, subca_name=None):

View file

@ -217,7 +217,7 @@ def get_user_dict(displayname=None, givenname=None, krbpasswordexpiration=None,
if displayname is not None:
user['displayname'] = displayname
if krbpasswordexpiration is not None:
user['krbpasswordexpiration'] = krbpasswordexpiration + "Z"
user['krbpasswordexpiration'] = f"{krbpasswordexpiration}Z"
if givenname is not None:
user['givenname'] = givenname
if loginshell is not None:
@ -305,12 +305,12 @@ def get_ssh_key_fingerprint(ssh_key, hash_algo='sha256'):
key_fp = ':'.join(a + b for a, b in zip(fp_plain[::2], fp_plain[1::2])).upper()
elif hash_algo == 'sha256':
fp_plain = base64.b64encode(hashlib.sha256(key).digest()).decode('ascii').rstrip('=')
key_fp = 'SHA256:{fp}'.format(fp=fp_plain)
key_fp = f'SHA256:{fp_plain}'
if len(parts) < 3:
return "%s (%s)" % (key_fp, key_type)
return f"{key_fp} ({key_type})"
else:
comment = parts[2]
return "%s %s (%s)" % (key_fp, comment, key_type)
return f"{key_fp} {comment} ({key_type})"
def ensure(module, client):

View file

@ -239,15 +239,14 @@ class IpbaseInfo(object):
})
if info['status'] != 200:
self.module.fail_json(msg='The API request to ipbase.com returned an error status code {0}'.format(info['status']))
self.module.fail_json(msg=f"The API request to ipbase.com returned an error status code {info['status']}")
else:
try:
content = response.read()
result = self.module.from_json(content.decode('utf8'))
except ValueError:
self.module.fail_json(
msg='Failed to parse the ipbase.com response: '
'{0} {1}'.format(url, content))
msg=f'Failed to parse the ipbase.com response: {url} {content}')
else:
return result
@ -274,7 +273,7 @@ class IpbaseInfo(object):
params['language'] = language
if params:
url += '?' + urlencode(params)
url += f"?{urlencode(params)}"
return self._get_url_data(url)

View file

@ -76,10 +76,10 @@ class IpifyFacts(object):
result = {
'ipify_public_ip': None
}
(response, info) = fetch_url(module=module, url=self.api_url + "?format=json", force=True, timeout=self.timeout)
(response, info) = fetch_url(module=module, url=f"{self.api_url}?format=json", force=True, timeout=self.timeout)
if not response:
module.fail_json(msg="No valid or no response from url %s within %s seconds (timeout)" % (self.api_url, self.timeout))
module.fail_json(msg=f"No valid or no response from url {self.api_url} within {self.timeout} seconds (timeout)")
data = json.loads(to_text(response.read()))
result['ipify_public_ip'] = data.get('ip')

View file

@ -100,16 +100,14 @@ class IpinfoioFacts(object):
try:
info['status'] == 200
except AssertionError:
self.module.fail_json(msg='Could not get {0} page, '
'check for connectivity!'.format(self.url))
self.module.fail_json(msg=f'Could not get {self.url} page, check for connectivity!')
else:
try:
content = response.read()
result = self.module.from_json(content.decode('utf8'))
except ValueError:
self.module.fail_json(
msg='Failed to parse the ipinfo.io response: '
'{0} {1}'.format(self.url, content))
msg=f'Failed to parse the ipinfo.io response: {self.url} {content}')
else:
return result

View file

@ -185,7 +185,7 @@ def main():
ipmi_cmd = command.Command(
bmc=name, userid=user, password=password, port=port, kg=key
)
module.debug('ipmi instantiated - name: "%s"' % name)
module.debug(f'ipmi instantiated - name: "{name}"')
current = ipmi_cmd.get_bootdev()
# uefimode may not supported by BMC, so use desired value as default
current.setdefault('uefimode', uefiboot)

View file

@ -214,7 +214,7 @@ def main():
ipmi_cmd = command.Command(
bmc=name, userid=user, password=password, port=port, kg=key
)
module.debug('ipmi instantiated - name: "%s"' % name)
module.debug(f'ipmi instantiated - name: "{name}"')
changed = False
if machine is None:

View file

@ -267,7 +267,8 @@ def write_state(b_path, lines, changed):
# Populate a temporary file
tmpfd, tmpfile = tempfile.mkstemp()
with os.fdopen(tmpfd, 'w') as f:
f.write("{0}\n".format("\n".join(lines)))
joined_lines = "\n".join(lines)
f.write(f"{joined_lines}\n")
# Prepare to copy temporary file to the final destination
if not os.path.exists(b_path):
@ -278,7 +279,7 @@ def write_state(b_path, lines, changed):
os.makedirs(b_destdir)
except Exception as err:
module.fail_json(
msg='Error creating %s: %s' % (destdir, to_native(err)),
msg=f'Error creating {destdir}: {err}',
initial_state=lines)
changed = True
@ -292,7 +293,7 @@ def write_state(b_path, lines, changed):
except Exception as err:
path = to_native(b_path, errors='surrogate_or_strict')
module.fail_json(
msg='Error saving state into %s: %s' % (path, to_native(err)),
msg=f'Error saving state into {path}: {err}',
initial_state=lines)
return changed
@ -311,9 +312,9 @@ def initialize_from_null_state(initializer, initcommand, fallbackcmd, table):
commandline += ['-t', table]
dummy = module.run_command(commandline, check_rc=True)
(rc, out, err) = module.run_command(initcommand, check_rc=True)
if '*%s' % table not in out.splitlines():
if f'*{table}' not in out.splitlines():
# The last resort.
iptables_input = '*%s\n:OUTPUT ACCEPT\nCOMMIT\n' % table
iptables_input = f'*{table}\n:OUTPUT ACCEPT\nCOMMIT\n'
dummy = module.run_command(fallbackcmd, data=iptables_input, check_rc=True)
(rc, out, err) = module.run_command(initcommand, check_rc=True)
@ -412,18 +413,18 @@ def main():
COMMANDARGS.extend(['--table', table])
if wait is not None:
TESTCOMMAND.extend(['--wait', '%d' % wait])
TESTCOMMAND.extend(['--wait', f'{wait}'])
if modprobe is not None:
b_modprobe = to_bytes(modprobe, errors='surrogate_or_strict')
if not os.path.exists(b_modprobe):
module.fail_json(msg="modprobe %s not found" % modprobe)
module.fail_json(msg=f"modprobe {modprobe} not found")
if not os.path.isfile(b_modprobe):
module.fail_json(msg="modprobe %s not a file" % modprobe)
module.fail_json(msg=f"modprobe {modprobe} not a file")
if not os.access(b_modprobe, os.R_OK):
module.fail_json(msg="modprobe %s not readable" % modprobe)
module.fail_json(msg=f"modprobe {modprobe} not readable")
if not os.access(b_modprobe, os.X_OK):
module.fail_json(msg="modprobe %s not executable" % modprobe)
module.fail_json(msg=f"modprobe {modprobe} not executable")
COMMANDARGS.extend(['--modprobe', modprobe])
INITIALIZER.extend(['--modprobe', modprobe])
INITCOMMAND.extend(['--modprobe', modprobe])
@ -437,11 +438,11 @@ def main():
if state == 'restored':
if not os.path.exists(b_path):
module.fail_json(msg="Source %s not found" % path)
module.fail_json(msg=f"Source {path} not found")
if not os.path.isfile(b_path):
module.fail_json(msg="Source %s not a file" % path)
module.fail_json(msg=f"Source {path} not a file")
if not os.access(b_path, os.R_OK):
module.fail_json(msg="Source %s not readable" % path)
module.fail_json(msg=f"Source {path} not readable")
state_to_restore = read_state(b_path)
cmd = None
else:
@ -461,16 +462,16 @@ def main():
if table is None:
if state == 'restored':
for t in TABLES:
if '*%s' % t in state_to_restore:
if len(stdout) == 0 or '*%s' % t not in stdout.splitlines():
if f'*{t}' in state_to_restore:
if len(stdout) == 0 or f'*{t}' not in stdout.splitlines():
(rc, stdout, stderr) = initialize_from_null_state(INITIALIZER, INITCOMMAND, FALLBACKCMD, t)
elif len(stdout) == 0:
(rc, stdout, stderr) = initialize_from_null_state(INITIALIZER, INITCOMMAND, FALLBACKCMD, 'filter')
elif state == 'restored' and '*%s' % table not in state_to_restore:
module.fail_json(msg="Table %s to restore not defined in %s" % (table, path))
elif state == 'restored' and f'*{table}' not in state_to_restore:
module.fail_json(msg=f"Table {table} to restore not defined in {path}")
elif len(stdout) == 0 or '*%s' % table not in stdout.splitlines():
elif len(stdout) == 0 or f'*{table}' not in stdout.splitlines():
(rc, stdout, stderr) = initialize_from_null_state(INITIALIZER, INITCOMMAND, FALLBACKCMD, table)
initial_state = filter_and_format_state(stdout)
@ -500,7 +501,7 @@ def main():
MAINCOMMAND.insert(0, bin_iptables_restore)
if wait is not None:
MAINCOMMAND.extend(['--wait', '%d' % wait])
MAINCOMMAND.extend(['--wait', f'{wait}'])
if _back is not None:
b_back = to_bytes(_back, errors='surrogate_or_strict')
@ -516,7 +517,7 @@ def main():
TESTCOMMAND = list(MAINCOMMAND)
TESTCOMMAND.insert(1, '--test')
error_msg = "Source %s is not suitable for input to %s" % (path, os.path.basename(bin_iptables_restore))
error_msg = f"Source {path} is not suitable for input to {os.path.basename(bin_iptables_restore)}"
# Due to a bug in iptables-nft-restore --test, we have to validate tables
# one by one (https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=960003).
@ -544,7 +545,8 @@ def main():
if module.check_mode:
tmpfd, tmpfile = tempfile.mkstemp()
with os.fdopen(tmpfd, 'w') as f:
f.write("{0}\n".format("\n".join(initial_state)))
joined_initial_state = "\n".join(initial_state)
f.write(f"{joined_initial_state}\n")
if filecmp.cmp(tmpfile, b_path):
restored_state = initial_state
@ -555,7 +557,7 @@ def main():
# Let time enough to the plugin to retrieve async status of the module
# in case of bad option type/value and the like.
if _back is not None:
b_starter = to_bytes('%s.starter' % _back, errors='surrogate_or_strict')
b_starter = to_bytes(f'{_back}.starter', errors='surrogate_or_strict')
while True:
if os.path.exists(b_starter):
os.remove(b_starter)
@ -633,8 +635,7 @@ def main():
tables_rollback = parse_per_table_state(stdout)
msg = (
"Failed to confirm state restored from %s after %ss. "
"Firewall has been rolled back to its initial state." % (path, _timeout)
f"Failed to confirm state restored from {path} after {_timeout}s. Firewall has been rolled back to its initial state."
)
module.fail_json(

View file

@ -188,51 +188,51 @@ class ResourceRecord(object):
def create_naptrrecord(self):
# create NAPTR record with the given params
record = ('naptrrecord %s -set ttl=%s;container=%s;order=%s;preference=%s;flags="%s";service="%s";replacement="%s"'
% (self.dnsname, self.ttl, self.container, self.order, self.preference, self.flags, self.service, self.replacement))
record = (f'naptrrecord {self.dnsname} -set ttl={self.ttl};container={self.container};order={self.order};'
f'preference={self.preference};flags="{self.flags}";service="{self.service}";replacement="{self.replacement}"')
return record
def create_srvrecord(self):
# create SRV record with the given params
record = ('srvrecord %s -set ttl=%s;container=%s;priority=%s;weight=%s;port=%s;target=%s'
% (self.dnsname, self.ttl, self.container, self.priority, self.weight, self.port, self.target))
record = (f'srvrecord {self.dnsname} -set ttl={self.ttl};container={self.container};priority={self.priority};'
f'weight={self.weight};port={self.port};target={self.target}')
return record
def create_arecord(self):
# create A record with the given params
if self.dnstype == 'AAAA':
record = 'aaaarecord %s %s -set ttl=%s;container=%s' % (self.dnsname, self.address, self.ttl, self.container)
record = f'aaaarecord {self.dnsname} {self.address} -set ttl={self.ttl};container={self.container}'
else:
record = 'arecord %s %s -set ttl=%s;container=%s' % (self.dnsname, self.address, self.ttl, self.container)
record = f'arecord {self.dnsname} {self.address} -set ttl={self.ttl};container={self.container}'
return record
def list_record(self, record):
# check if the record exists via list on ipwcli
search = 'list %s' % (record.replace(';', '&&').replace('set', 'where'))
search = f"list {record.replace(';', '&&').replace('set', 'where')}"
cmd = [
self.module.get_bin_path('ipwcli', True),
'-user=%s' % self.user,
'-password=%s' % self.password,
f'-user={self.user}',
f'-password={self.password}',
]
rc, out, err = self.module.run_command(cmd, data=search)
if 'Invalid username or password' in out:
self.module.fail_json(msg='access denied at ipwcli login: Invalid username or password')
if (('ARecord %s' % self.dnsname in out and rc == 0) or ('SRVRecord %s' % self.dnsname in out and rc == 0) or
('NAPTRRecord %s' % self.dnsname in out and rc == 0)):
if ((f'ARecord {self.dnsname}' in out and rc == 0) or (f'SRVRecord {self.dnsname}' in out and rc == 0) or
(f'NAPTRRecord {self.dnsname}' in out and rc == 0)):
return True, rc, out, err
return False, rc, out, err
def deploy_record(self, record):
# check what happens if create fails on ipworks
stdin = 'create %s' % (record)
stdin = f'create {record}'
cmd = [
self.module.get_bin_path('ipwcli', True),
'-user=%s' % self.user,
'-password=%s' % self.password,
f'-user={self.user}',
f'-password={self.password}',
]
rc, out, err = self.module.run_command(cmd, data=stdin)
@ -246,11 +246,11 @@ class ResourceRecord(object):
def delete_record(self, record):
# check what happens if create fails on ipworks
stdin = 'delete %s' % (record.replace(';', '&&').replace('set', 'where'))
stdin = f"delete {record.replace(';', '&&').replace('set', 'where')}"
cmd = [
self.module.get_bin_path('ipwcli', True),
'-user=%s' % self.user,
'-password=%s' % self.password,
f'-user={self.user}',
f'-password={self.password}',
]
rc, out, err = self.module.run_command(cmd, data=stdin)