mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-04-23 20:29:08 +00:00
Replace % string formatting with f-strings across multiple plugins (#11908)
* Replace % string formatting with f-strings across multiple plugins Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * Add changelog fragment for PR 11908 Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
253ac45dd3
commit
53397c081a
14 changed files with 65 additions and 64 deletions
|
|
@ -184,8 +184,6 @@ def main():
|
|||
notify = module.params["notify"]
|
||||
|
||||
URI = f"https://{subscription}.campfirenow.com"
|
||||
NSTR = "<message><type>SoundMessage</type><body>%s</body></message>"
|
||||
MSTR = "<message><body>%s</body></message>"
|
||||
AGENT = "Ansible/1.2"
|
||||
|
||||
# Hack to add basic auth username and password the way fetch_url expects
|
||||
|
|
@ -197,14 +195,21 @@ def main():
|
|||
|
||||
# Send some audible notification if requested
|
||||
if notify:
|
||||
response, info = fetch_url(module, target_url, data=NSTR % html_escape(notify), headers=headers)
|
||||
response, info = fetch_url(
|
||||
module,
|
||||
target_url,
|
||||
data=f"<message><type>SoundMessage</type><body>{html_escape(notify)}</body></message>",
|
||||
headers=headers,
|
||||
)
|
||||
if info["status"] not in [200, 201]:
|
||||
module.fail_json(
|
||||
msg=f"unable to send msg: '{notify}', campfire api returned error code: '{info['status']}'"
|
||||
)
|
||||
|
||||
# Send the message
|
||||
response, info = fetch_url(module, target_url, data=MSTR % html_escape(msg), headers=headers)
|
||||
response, info = fetch_url(
|
||||
module, target_url, data=f"<message><body>{html_escape(msg)}</body></message>", headers=headers
|
||||
)
|
||||
if info["status"] not in [200, 201]:
|
||||
module.fail_json(msg=f"unable to send msg: '{msg}', campfire api returned error code: '{info['status']}'")
|
||||
|
||||
|
|
|
|||
|
|
@ -120,7 +120,6 @@ def decompress(b_src, b_dest, handler):
|
|||
|
||||
|
||||
class Decompress(ModuleHelper):
|
||||
destination_filename_template = "%s_decompressed"
|
||||
output_params = "dest"
|
||||
|
||||
module = dict(
|
||||
|
|
@ -189,7 +188,7 @@ class Decompress(ModuleHelper):
|
|||
if src.endswith(fmt_extension) and len(src) > len(fmt_extension):
|
||||
filename = src[: -len(fmt_extension)]
|
||||
else:
|
||||
filename = Decompress.destination_filename_template % src
|
||||
filename = f"{src}_decompressed"
|
||||
return filename
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -65,14 +65,12 @@ from urllib.parse import urlencode
|
|||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.urls import fetch_url
|
||||
|
||||
BASE_URL = "https://grove.io/api/notice/%s/"
|
||||
|
||||
# ==============================================================
|
||||
# do_notify_grove
|
||||
|
||||
|
||||
def do_notify_grove(module, channel_token, service, message, url=None, icon_url=None):
|
||||
my_url = BASE_URL % (channel_token,)
|
||||
my_url = f"https://grove.io/api/notice/{channel_token}/"
|
||||
|
||||
my_data = dict(service=service, message=message)
|
||||
if url is not None:
|
||||
|
|
|
|||
|
|
@ -391,10 +391,7 @@ def do_ini(
|
|||
within_section = not section
|
||||
section_start = section_end = 0
|
||||
msg = "OK"
|
||||
if no_extra_spaces:
|
||||
assignment_format = "%s=%s\n"
|
||||
else:
|
||||
assignment_format = "%s = %s\n"
|
||||
sep = "=" if no_extra_spaces else " = "
|
||||
|
||||
option_no_value_present = False
|
||||
|
||||
|
|
@ -453,7 +450,7 @@ def do_ini(
|
|||
option_no_value_present = True
|
||||
else:
|
||||
# replace existing option=value line(s)
|
||||
newline = assignment_format % (option, matched_value)
|
||||
newline = f"{option}{sep}{matched_value}\n"
|
||||
(changed, msg) = update_section_line(
|
||||
option, changed, section_lines, index, changed_lines, ignore_spaces, newline, msg
|
||||
)
|
||||
|
|
@ -472,7 +469,7 @@ def do_ini(
|
|||
if len(values) > 0:
|
||||
for index, line in enumerate(section_lines):
|
||||
if not changed_lines[index] and match_function(option, line):
|
||||
newline = assignment_format % (option, values.pop(0))
|
||||
newline = f"{option}{sep}{values.pop(0)}\n"
|
||||
(changed, msg) = update_section_line(
|
||||
option, changed, section_lines, index, changed_lines, ignore_spaces, newline, msg
|
||||
)
|
||||
|
|
@ -498,7 +495,7 @@ def do_ini(
|
|||
# otherwise some of their options might appear in reverse order for whatever fancy reason ¯\_(ツ)_/¯
|
||||
if element is not None:
|
||||
# insert option=value line
|
||||
section_lines.insert(index, assignment_format % (option, element))
|
||||
section_lines.insert(index, f"{option}{sep}{element}\n")
|
||||
msg = "option added"
|
||||
changed = True
|
||||
elif element is None and allow_no_value:
|
||||
|
|
@ -555,7 +552,7 @@ def do_ini(
|
|||
if condition["option"] != option:
|
||||
if len(condition["values"]) > 0:
|
||||
for value in condition["values"]:
|
||||
ini_lines.append(assignment_format % (condition["option"], value))
|
||||
ini_lines.append(f"{condition['option']}{sep}{value}\n")
|
||||
elif allow_no_value:
|
||||
ini_lines.append(f"{condition['option']}\n")
|
||||
elif not exclusive:
|
||||
|
|
@ -564,7 +561,7 @@ def do_ini(
|
|||
values.append(value)
|
||||
if option and values:
|
||||
for value in values:
|
||||
ini_lines.append(assignment_format % (option, value))
|
||||
ini_lines.append(f"{option}{sep}{value}\n")
|
||||
elif option and not values and allow_no_value:
|
||||
ini_lines.append(f"{option}\n")
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -107,7 +107,6 @@ except ImportError:
|
|||
|
||||
UUID_REQUIRED = "UUID of device is required for cmms_by_uuid command."
|
||||
CHASSIS_UUID_REQUIRED = "UUID of chassis is required for cmms_by_chassis_uuid command."
|
||||
SUCCESS_MSG = "Success %s result"
|
||||
|
||||
|
||||
def _cmms(module, lxca_con):
|
||||
|
|
@ -160,7 +159,7 @@ def execute_module(module):
|
|||
try:
|
||||
with connection_object(module) as lxca_con:
|
||||
result = FUNC_DICT[module.params["command_options"]](module, lxca_con)
|
||||
module.exit_json(changed=False, msg=SUCCESS_MSG % module.params["command_options"], result=result)
|
||||
module.exit_json(changed=False, msg=f"Success {module.params['command_options']} result", result=result)
|
||||
except Exception as exception:
|
||||
error_msg = "; ".join((e) for e in exception.args)
|
||||
module.fail_json(msg=error_msg, exception=traceback.format_exc())
|
||||
|
|
|
|||
|
|
@ -126,7 +126,6 @@ except ImportError:
|
|||
|
||||
UUID_REQUIRED = "UUID of device is required for nodes_by_uuid command."
|
||||
CHASSIS_UUID_REQUIRED = "UUID of chassis is required for nodes_by_chassis_uuid command."
|
||||
SUCCESS_MSG = "Success %s result"
|
||||
|
||||
|
||||
def _nodes(module, lxca_con):
|
||||
|
|
@ -192,7 +191,7 @@ def execute_module(module):
|
|||
try:
|
||||
with connection_object(module) as lxca_con:
|
||||
result = FUNC_DICT[module.params["command_options"]](module, lxca_con)
|
||||
module.exit_json(changed=False, msg=SUCCESS_MSG % module.params["command_options"], result=result)
|
||||
module.exit_json(changed=False, msg=f"Success {module.params['command_options']} result", result=result)
|
||||
except Exception as exception:
|
||||
error_msg = "; ".join(exception.args)
|
||||
module.fail_json(msg=error_msg, exception=traceback.format_exc())
|
||||
|
|
|
|||
|
|
@ -160,8 +160,6 @@ EXAMPLES = r"""
|
|||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.urls import fetch_url
|
||||
|
||||
ROCKETCHAT_INCOMING_WEBHOOK = "%s://%s/hooks/%s"
|
||||
|
||||
|
||||
def build_payload_for_rocketchat(
|
||||
module, text, channel, username, icon_url, icon_emoji, link_names, color, attachments, is_pre740
|
||||
|
|
@ -205,7 +203,7 @@ def do_notify_rocketchat(module, domain, token, protocol, payload, is_pre740):
|
|||
if token.count("/") < 1:
|
||||
module.fail_json(msg="Invalid Token specified, provide a valid token")
|
||||
|
||||
rocketchat_incoming_webhook = ROCKETCHAT_INCOMING_WEBHOOK % (protocol, domain, token)
|
||||
rocketchat_incoming_webhook = f"{protocol}://{domain}/hooks/{token}"
|
||||
|
||||
headers = None
|
||||
if not is_pre740:
|
||||
|
|
|
|||
|
|
@ -269,12 +269,6 @@ from urllib.parse import urlencode
|
|||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.urls import fetch_url
|
||||
|
||||
OLD_SLACK_INCOMING_WEBHOOK = "https://%s/services/hooks/incoming-webhook?token=%s"
|
||||
SLACK_INCOMING_WEBHOOK = "https://hooks.%s/services/%s"
|
||||
SLACK_POSTMESSAGE_WEBAPI = "https://%s/api/chat.postMessage"
|
||||
SLACK_UPDATEMESSAGE_WEBAPI = "https://%s/api/chat.update"
|
||||
SLACK_CONVERSATIONS_HISTORY_WEBAPI = "https://%s/api/conversations.history"
|
||||
|
||||
# Escaping quotes and apostrophes to avoid ending string prematurely in ansible call.
|
||||
# We do not escape other characters used as Slack metacharacters (e.g. &, <, >).
|
||||
escape_table = {
|
||||
|
|
@ -402,7 +396,7 @@ def get_slack_message(module, domain, token, channel, ts):
|
|||
}
|
||||
)
|
||||
domain = validate_slack_domain(domain)
|
||||
url = f"{SLACK_CONVERSATIONS_HISTORY_WEBAPI % domain}?{qs}"
|
||||
url = f"https://{domain}/api/conversations.history?{qs}"
|
||||
response, info = fetch_url(module=module, url=url, headers=headers, method="GET")
|
||||
if info["status"] != 200:
|
||||
module.fail_json(msg="failed to get slack message")
|
||||
|
|
@ -421,10 +415,10 @@ def do_notify_slack(module, domain, token, payload):
|
|||
if token.count("/") >= 2:
|
||||
# New style webhook token
|
||||
domain = validate_slack_domain(domain)
|
||||
slack_uri = SLACK_INCOMING_WEBHOOK % (domain, token)
|
||||
slack_uri = f"https://hooks.{domain}/services/{token}"
|
||||
elif re.match(r"^xox[abp]-\S+$", token):
|
||||
domain = validate_slack_domain(domain)
|
||||
slack_uri = (SLACK_UPDATEMESSAGE_WEBAPI if "ts" in payload else SLACK_POSTMESSAGE_WEBAPI) % domain
|
||||
slack_uri = f"https://{domain}/api/{'chat.update' if 'ts' in payload else 'chat.postMessage'}"
|
||||
use_webapi = True
|
||||
else:
|
||||
if not domain:
|
||||
|
|
@ -432,7 +426,7 @@ def do_notify_slack(module, domain, token, payload):
|
|||
msg="Slack has updated its webhook API. You need to specify a token of the form "
|
||||
"XXXX/YYYY/ZZZZ in your playbook"
|
||||
)
|
||||
slack_uri = OLD_SLACK_INCOMING_WEBHOOK % (domain, token)
|
||||
slack_uri = f"https://{domain}/services/hooks/incoming-webhook?token={token}"
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json; charset=UTF-8",
|
||||
|
|
@ -448,7 +442,7 @@ def do_notify_slack(module, domain, token, payload):
|
|||
if use_webapi:
|
||||
obscured_incoming_webhook = slack_uri
|
||||
else:
|
||||
obscured_incoming_webhook = SLACK_INCOMING_WEBHOOK % (domain, "[obscured]")
|
||||
obscured_incoming_webhook = f"https://hooks.{domain}/services/[obscured]"
|
||||
module.fail_json(msg=f" failed to send {data} to {obscured_incoming_webhook}: {info['msg']}")
|
||||
|
||||
# each API requires different handling
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue