1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-06-10 18:15:39 +00:00
This commit is contained in:
Alexei Znamensky 2026-06-06 01:40:29 -04:00 committed by GitHub
commit 5b27656539
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 182 additions and 150 deletions

View file

@ -0,0 +1,6 @@
minor_changes:
- "pritunl_org - replace ``module.params.get()`` with subscript access (https://github.com/ansible-collections/community.general/pull/12104)."
- "pritunl_org_info - replace ``module.params.get()`` with subscript access (https://github.com/ansible-collections/community.general/pull/12104)."
- "pritunl_user - replace ``module.params.get()`` with subscript access (https://github.com/ansible-collections/community.general/pull/12104)."
- "pritunl_user_info - replace ``module.params.get()`` with subscript access (https://github.com/ansible-collections/community.general/pull/12104)."
- "vexata_volume - replace ``module.params.get()`` with subscript access (https://github.com/ansible-collections/community.general/pull/12104)."

View file

@ -96,10 +96,10 @@ def ecs_argument_spec():
def get_acs_connection_info(params):
ecs_params = dict(
acs_access_key_id=params.get("alicloud_access_key"),
acs_secret_access_key=params.get("alicloud_secret_key"),
security_token=params.get("alicloud_security_token"),
ecs_role_name=params.get("ecs_role_name"),
acs_access_key_id=params["alicloud_access_key"],
acs_secret_access_key=params["alicloud_secret_key"],
security_token=params["alicloud_security_token"],
ecs_role_name=params["ecs_role_name"],
user_agent="Ansible-Provider-Alicloud",
)
return ecs_params
@ -123,28 +123,28 @@ def get_assume_role(params):
"""Return new params"""
sts_params = get_acs_connection_info(params)
assume_role = {}
if params.get("assume_role"):
if params["assume_role"]:
assume_role["alicloud_assume_role_arn"] = params["assume_role"].get("role_arn")
assume_role["alicloud_assume_role_session_name"] = params["assume_role"].get("session_name")
assume_role["alicloud_assume_role_session_expiration"] = params["assume_role"].get("session_expiration")
assume_role["alicloud_assume_role_policy"] = params["assume_role"].get("policy")
assume_role_params = {
"role_arn": params.get("alicloud_assume_role_arn")
if params.get("alicloud_assume_role_arn")
"role_arn": params["alicloud_assume_role_arn"]
if params["alicloud_assume_role_arn"]
else assume_role.get("alicloud_assume_role_arn"),
"role_session_name": params.get("alicloud_assume_role_session_name")
if params.get("alicloud_assume_role_session_name")
"role_session_name": params["alicloud_assume_role_session_name"]
if params["alicloud_assume_role_session_name"]
else assume_role.get("alicloud_assume_role_session_name"),
"duration_seconds": params.get("alicloud_assume_role_session_expiration")
if params.get("alicloud_assume_role_session_expiration")
"duration_seconds": params["alicloud_assume_role_session_expiration"]
if params["alicloud_assume_role_session_expiration"]
else assume_role.get("alicloud_assume_role_session_expiration", 3600),
"policy": assume_role.get("alicloud_assume_role_policy", {}),
}
try:
sts = (
connect_to_acs(footmark.sts, params.get("alicloud_region"), **sts_params)
connect_to_acs(footmark.sts, params["alicloud_region"], **sts_params)
.assume_role(**assume_role_params)
.read()
)
@ -201,7 +201,7 @@ def get_profile(params):
params["alicloud_assume_role_session_expiration"] = auth.get("expired_seconds")
params["alicloud_region"] = auth.get("region_id")
params = get_assume_role(params)
elif params.get("alicloud_assume_role_arn") or params.get("assume_role"):
elif params["alicloud_assume_role_arn"] or params["assume_role"]:
params = get_assume_role(params)
else:
params = get_acs_connection_info(params)
@ -212,7 +212,7 @@ def ecs_connect(module: AnsibleModule):
"""Return an ecs connection"""
ecs_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
ecs = connect_to_acs(footmark.ecs, region, **ecs_params)
@ -226,7 +226,7 @@ def slb_connect(module: AnsibleModule):
"""Return an slb connection"""
slb_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
slb = connect_to_acs(footmark.slb, region, **slb_params)
@ -240,7 +240,7 @@ def dns_connect(module: AnsibleModule):
"""Return an dns connection"""
dns_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
dns = connect_to_acs(footmark.dns, region, **dns_params)
@ -254,7 +254,7 @@ def vpc_connect(module: AnsibleModule):
"""Return an vpc connection"""
vpc_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
vpc = connect_to_acs(footmark.vpc, region, **vpc_params)
@ -268,7 +268,7 @@ def rds_connect(module: AnsibleModule):
"""Return an rds connection"""
rds_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
rds = connect_to_acs(footmark.rds, region, **rds_params)
@ -282,7 +282,7 @@ def ess_connect(module: AnsibleModule):
"""Return an ess connection"""
ess_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
ess = connect_to_acs(footmark.ess, region, **ess_params)
@ -296,7 +296,7 @@ def sts_connect(module: AnsibleModule):
"""Return an sts connection"""
sts_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
sts = connect_to_acs(footmark.sts, region, **sts_params)
@ -310,7 +310,7 @@ def ram_connect(module: AnsibleModule):
"""Return an ram connection"""
ram_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
ram = connect_to_acs(footmark.ram, region, **ram_params)
@ -324,7 +324,7 @@ def market_connect(module: AnsibleModule):
"""Return an market connection"""
market_params = get_profile(module.params)
# If we have a region specified, connect to its endpoint.
region = module.params.get("alicloud_region")
region = module.params["alicloud_region"]
if region:
try:
market = connect_to_acs(footmark.market, region, **market_params)

View file

@ -289,7 +289,7 @@ class _ConsulModule:
# Remove values that are None
params = {k: v for k, v in params.items() if v is not None}
ca_path = module_params.get("ca_path")
ca_path = module_params.get("ca_path") # UNCERTAIN: 'module_params' might be a params dict — check callers
base_url = f"{module_params['scheme']}://{module_params['host']}:{module_params['port']}/v1"
url = "/".join([base_url] + list(url_parts))

View file

@ -398,7 +398,7 @@ def build_path(module: AnsibleModule, path, kv=None):
else:
if n in module.params:
v[n] = str(module.params.get(n))
v[n] = str(module.params[n])
else:
v[n] = ""

View file

@ -55,7 +55,7 @@ class IPAClient:
self.protocol = protocol
self.module = module
self.headers = None
self.timeout = module.params.get("ipa_timeout")
self.timeout = module.params["ipa_timeout"]
self.use_gssapi = False
def get_base_url(self) -> str:
@ -178,7 +178,7 @@ class IPAClient:
def get_diff(self, ipa_data, module_data):
result = []
for key in module_data.keys():
mod_value = module_data.get(key, None)
mod_value = module_data.get(key, None) # UNCERTAIN: 'module_data' might be a params dict — check callers
if isinstance(mod_value, list):
default = []
else:

View file

@ -210,11 +210,11 @@ def _token_request(module_params: dict[str, t.Any], payload: dict[str, t.Any]) -
base_url = module_params["auth_keycloak_url"]
if not base_url.lower().startswith(("http", "https")):
raise KeycloakError(f"auth_url '{base_url}' should either start with 'http' or 'https'.")
auth_realm = module_params.get("auth_realm")
auth_realm = module_params["auth_realm"]
auth_url = URL_TOKEN.format(url=base_url, realm=auth_realm)
http_agent = module_params.get("http_agent")
validate_certs = module_params.get("validate_certs")
connection_timeout = module_params.get("connection_timeout")
http_agent = module_params["http_agent"]
validate_certs = module_params["validate_certs"]
connection_timeout = module_params["connection_timeout"]
try:
r = json.loads(
@ -243,10 +243,10 @@ def _request_token_using_credentials(module_params: dict[str, t.Any]) -> str:
:param module_params: parameters of the module. Must include 'auth_username' and 'auth_password'.
:return: connection header
"""
client_id = module_params.get("auth_client_id")
auth_username = module_params.get("auth_username")
auth_password = module_params.get("auth_password")
client_secret = module_params.get("auth_client_secret")
client_id = module_params["auth_client_id"]
auth_username = module_params["auth_username"]
auth_password = module_params["auth_password"]
client_secret = module_params["auth_client_secret"]
temp_payload = {
"grant_type": "password",
@ -267,9 +267,9 @@ def _request_token_using_refresh_token(module_params: dict[str, t.Any]) -> str:
:param module_params: parameters of the module. Must include 'refresh_token'.
:return: connection header
"""
client_id = module_params.get("auth_client_id")
refresh_token = module_params.get("refresh_token")
client_secret = module_params.get("auth_client_secret")
client_id = module_params["auth_client_id"]
refresh_token = module_params["refresh_token"]
client_secret = module_params["auth_client_secret"]
temp_payload = {
"grant_type": "refresh_token",
@ -292,8 +292,8 @@ def _request_token_using_client_credentials(module_params: dict[str, t.Any]) ->
and 'auth_client_secret'..
:return: connection header
"""
client_id = module_params.get("auth_client_id")
client_secret = module_params.get("auth_client_secret")
client_id = module_params["auth_client_id"]
client_secret = module_params["auth_client_secret"]
temp_payload = {
"grant_type": "client_credentials",
@ -312,12 +312,12 @@ def get_token(module_params: dict[str, t.Any]) -> dict[str, str]:
:param module_params: parameters of the module
:return: connection header
"""
token = module_params.get("token")
token = module_params["token"]
if token is None:
auth_client_id = module_params.get("auth_client_id")
auth_client_secret = module_params.get("auth_client_secret")
auth_username = module_params.get("auth_username")
auth_client_id = module_params["auth_client_id"]
auth_client_secret = module_params["auth_client_secret"]
auth_username = module_params["auth_username"]
if auth_client_id is not None and auth_client_secret is not None and auth_username is None:
token = _request_token_using_client_credentials(module_params)
else:
@ -395,11 +395,11 @@ class KeycloakAPI:
def __init__(self, module: AnsibleModule, connection_header: dict[str, str]) -> None:
self.module = module
self.baseurl = self.module.params.get("auth_keycloak_url")
self.validate_certs = self.module.params.get("validate_certs")
self.connection_timeout = self.module.params.get("connection_timeout")
self.baseurl = self.module.params["auth_keycloak_url"]
self.validate_certs = self.module.params["validate_certs"]
self.connection_timeout = self.module.params["connection_timeout"]
self.restheaders = connection_header
self.http_agent = self.module.params.get("http_agent")
self.http_agent = self.module.params["http_agent"]
def _request(
self, url: str, method: str, data: str | bytes | None = None, *, extra_headers: dict[str, str] | None = None
@ -443,7 +443,7 @@ class KeycloakAPI:
if isinstance(r, Exception):
# Try to refresh token and retry, if available
refresh_token = self.module.params.get("refresh_token")
refresh_token = self.module.params["refresh_token"]
if refresh_token is not None:
try:
token = _request_token_using_refresh_token(self.module.params)
@ -457,8 +457,8 @@ class KeycloakAPI:
if isinstance(r, Exception):
# Try to re-auth with username/password, if available
auth_username = self.module.params.get("auth_username")
auth_password = self.module.params.get("auth_password")
auth_username = self.module.params["auth_username"]
auth_password = self.module.params["auth_password"]
if auth_username is not None and auth_password is not None:
token = _request_token_using_credentials(self.module.params)
self.restheaders["Authorization"] = f"Bearer {token}"
@ -467,8 +467,8 @@ class KeycloakAPI:
if isinstance(r, Exception):
# Try to re-auth with client_id and client_secret, if available
auth_client_id = self.module.params.get("auth_client_id")
auth_client_secret = self.module.params.get("auth_client_secret")
auth_client_id = self.module.params["auth_client_id"]
auth_client_secret = self.module.params["auth_client_secret"]
if auth_client_id is not None and auth_client_secret is not None:
try:
token = _request_token_using_client_credentials(self.module.params)

View file

@ -61,9 +61,9 @@ def keycloak_clientsecret_module_resolve_params(module: AnsibleModule, kc: Keycl
:return: tuple of id, realm
"""
realm = module.params.get("realm")
id = module.params.get("id")
client_id = module.params.get("client_id")
realm = module.params["realm"]
id = module.params["id"]
client_id = module.params["client_id"]
# only lookup the client_id if id isn't provided.
# in the case that both are provided, prefer the ID, since it is one

View file

@ -16,7 +16,7 @@ class StateMixin:
default_state: str | None = None
def _state(self) -> str:
state: str = self.module.params.get(self.state_param) # type: ignore[attr-defined]
state: str = self.module.params[self.state_param] # type: ignore[attr-defined]
return self.default_state if state is None else state
def _method(self, state: str) -> str:

View file

@ -217,14 +217,14 @@ class OneViewModuleBase(metaclass=abc.ABCMeta):
self._check_hpe_oneview_sdk()
self._create_oneview_client()
self.state = self.module.params.get("state")
self.data = self.module.params.get("data")
self.state = self.module.params["state"]
self.data = self.module.params["data"]
# Preload params for get_all - used by facts
self.facts_params = self.module.params.get("params") or {}
self.facts_params = self.module.params["params"] or {}
# Preload options as dict - used by facts
self.options = transform_list_to_dict(self.module.params.get("options"))
self.options = transform_list_to_dict(self.module.params["options"])
self.validate_etag_support = validate_etag_support
@ -245,7 +245,7 @@ class OneViewModuleBase(metaclass=abc.ABCMeta):
self.module.fail_json(msg=missing_required_lib("hpOneView"), exception=HPE_ONEVIEW_IMP_ERR)
def _create_oneview_client(self):
if self.module.params.get("hostname"):
if self.module.params["hostname"]:
config = dict(
ip=self.module.params["hostname"],
credentials=dict(userName=self.module.params["username"], password=self.module.params["password"]),
@ -281,7 +281,7 @@ class OneViewModuleBase(metaclass=abc.ABCMeta):
"""
try:
if self.validate_etag_support:
if not self.module.params.get("validate_etag"):
if not self.module.params["validate_etag"]:
self.oneview_client.connection.disable_etag_validation()
result = self.execute_module()

View file

@ -76,7 +76,7 @@ class Online:
def __init__(self, module: AnsibleModule) -> None:
self.module = module
self.headers = {
"Authorization": f"Bearer {self.module.params.get('api_token')}",
"Authorization": f"Bearer {self.module.params['api_token']}",
"User-Agent": self.get_user_agent_string(module),
"Content-type": "application/json",
}
@ -86,7 +86,7 @@ class Online:
results = self.get(f"/{self.name}")
if not results.ok:
raise OnlineException(
f"Error fetching {self.name} ({self.module.params.get('api_url')}/{self.name}) [{results.status_code}: {results.json['message']}]"
f"Error fetching {self.name} ({self.module.params['api_url']}/{self.name}) [{results.status_code}: {results.json['message']}]"
)
return results.json
@ -94,7 +94,7 @@ class Online:
def _url_builder(self, path):
if path[0] == "/":
path = path[1:]
return f"{self.module.params.get('api_url')}/{path}"
return f"{self.module.params['api_url']}/{path}"
def send(self, method, path, data=None, headers=None):
url = self._url_builder(path)
@ -109,7 +109,7 @@ class Online:
data=data,
headers=self.headers,
method=method,
timeout=self.module.params.get("api_timeout"),
timeout=self.module.params["api_timeout"],
)
# Exceptions in fetch_url may result in a status -1, the ensures a proper error to the user in all cases

View file

@ -129,24 +129,24 @@ class OpenNebulaModule:
if not HAS_PYONE:
self.fail("pyone is required for this module")
if self.module.params.get("api_url"):
url = self.module.params.get("api_url")
if self.module.params["api_url"]:
url = self.module.params["api_url"]
else:
self.fail("Either api_url or the environment variable ONE_URL must be provided")
if self.module.params.get("api_username"):
username = self.module.params.get("api_username")
if self.module.params["api_username"]:
username = self.module.params["api_username"]
else:
self.fail("Either api_username or the environment variable ONE_USERNAME must be provided")
if self.module.params.get("api_password"):
password = self.module.params.get("api_password")
if self.module.params["api_password"]:
password = self.module.params["api_password"]
else:
self.fail("Either api_password or the environment variable ONE_PASSWORD must be provided")
session = f"{username}:{password}"
if not self.module.params.get("validate_certs") and "PYTHONHTTPSVERIFY" not in environ:
if not self.module.params["validate_certs"] and "PYTHONHTTPSVERIFY" not in environ:
return OneServer(url, session=session, context=no_ssl_validation_context)
else:
return OneServer(url, session)
@ -191,7 +191,7 @@ class OpenNebulaModule:
if "cluster_name" in self.module.params:
clusters = self.one.clusterpool.info()
for cluster in clusters.CLUSTER:
if self.module.params.get("cluster_name") == cluster.NAME:
if self.module.params["cluster_name"] == cluster.NAME:
resolved_params["cluster_id"] = cluster.ID
return resolved_params
@ -331,7 +331,7 @@ class OpenNebulaModule:
"""
if not wait_timeout:
wait_timeout = self.module.params.get("wait_timeout")
wait_timeout = self.module.params["wait_timeout"]
start_time = time.time()

View file

@ -44,10 +44,10 @@ def get_pritunl_settings(module: AnsibleModule) -> dict[str, t.Any]:
Helper function to set required Pritunl request params from module arguments.
"""
return {
"api_token": module.params.get("pritunl_api_token"),
"api_secret": module.params.get("pritunl_api_secret"),
"base_url": module.params.get("pritunl_url"),
"validate_certs": module.params.get("validate_certs"),
"api_token": module.params["pritunl_api_token"],
"api_secret": module.params["pritunl_api_secret"],
"base_url": module.params["pritunl_url"],
"validate_certs": module.params["validate_certs"],
}

View file

@ -76,10 +76,10 @@ class RedfishUtils:
self.resource_id = resource_id
self.data_modification = data_modification
self.strip_etag_quotes = strip_etag_quotes
self.ciphers = ciphers if ciphers is not None else module.params.get("ciphers")
self.ciphers = ciphers if ciphers is not None else module.params["ciphers"]
self._vendor = None
self.validate_certs = module.params.get("validate_certs", False)
self.ca_path = module.params.get("ca_path")
self.validate_certs = module.params["validate_certs"]
self.ca_path = module.params["ca_path"]
def _auth_params(self, headers: dict[str, str]) -> tuple[str | None, str | None, bool]:
"""
@ -1933,11 +1933,15 @@ class RedfishUtils:
return operation_results
def simple_update(self, update_opts):
image_uri = update_opts.get("update_image_uri")
protocol = update_opts.get("update_protocol")
targets = update_opts.get("update_targets")
creds = update_opts.get("update_creds")
apply_time = update_opts.get("update_apply_time")
image_uri = update_opts.get(
"update_image_uri"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
protocol = update_opts.get("update_protocol") # UNCERTAIN: 'update_opts' might be a params dict — check callers
targets = update_opts.get("update_targets") # UNCERTAIN: 'update_opts' might be a params dict — check callers
creds = update_opts.get("update_creds") # UNCERTAIN: 'update_opts' might be a params dict — check callers
apply_time = update_opts.get(
"update_apply_time"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
if not image_uri:
return {"ret": False, "msg": "Must specify update_image_uri for the SimpleUpdate command"}
@ -2005,13 +2009,25 @@ class RedfishUtils:
:param update_opts: The parameters for the update operation
:return: dict containing the response of the update request
"""
image_file = update_opts.get("update_image_file")
targets = update_opts.get("update_targets")
apply_time = update_opts.get("update_apply_time")
oem_params = update_opts.get("update_oem_params")
custom_oem_header = update_opts.get("update_custom_oem_header")
custom_oem_mime_type = update_opts.get("update_custom_oem_mime_type")
custom_oem_params = update_opts.get("update_custom_oem_params")
image_file = update_opts.get(
"update_image_file"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
targets = update_opts.get("update_targets") # UNCERTAIN: 'update_opts' might be a params dict — check callers
apply_time = update_opts.get(
"update_apply_time"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
oem_params = update_opts.get(
"update_oem_params"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
custom_oem_header = update_opts.get(
"update_custom_oem_header"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
custom_oem_mime_type = update_opts.get(
"update_custom_oem_mime_type"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
custom_oem_params = update_opts.get(
"update_custom_oem_params"
) # UNCERTAIN: 'update_opts' might be a params dict — check callers
# Ensure the image file is provided
if not image_file:
@ -2206,7 +2222,9 @@ class RedfishUtils:
# Build boot device list
boot_device_list = []
for ref in boot_order:
boot_device_list.append(boot_options_dict.get(ref, {"BootOptionReference": ref}))
boot_device_list.append(
boot_options_dict.get(ref, {"BootOptionReference": ref})
) # UNCERTAIN: 'boot_options_dict' might be a params dict — check callers
result["entries"] = boot_device_list
return result
@ -2279,11 +2297,15 @@ class RedfishUtils:
def set_boot_override_with_settings_uri(self, boot_opts):
# Extract the requested boot override options
bootdevice = boot_opts.get("bootdevice")
uefi_target = boot_opts.get("uefi_target")
boot_next = boot_opts.get("boot_next")
override_enabled = boot_opts.get("override_enabled")
boot_override_mode = boot_opts.get("boot_override_mode")
bootdevice = boot_opts.get("bootdevice") # UNCERTAIN: 'boot_opts' might be a params dict — check callers
uefi_target = boot_opts.get("uefi_target") # UNCERTAIN: 'boot_opts' might be a params dict — check callers
boot_next = boot_opts.get("boot_next") # UNCERTAIN: 'boot_opts' might be a params dict — check callers
override_enabled = boot_opts.get(
"override_enabled"
) # UNCERTAIN: 'boot_opts' might be a params dict — check callers
boot_override_mode = boot_opts.get(
"boot_override_mode"
) # UNCERTAIN: 'boot_opts' might be a params dict — check callers
if not bootdevice and override_enabled != "Disabled":
return {"ret": False, "msg": "bootdevice option required for temporary boot override"}
@ -2367,11 +2389,15 @@ class RedfishUtils:
def set_boot_override(self, boot_opts):
# Extract the requested boot override options
bootdevice = boot_opts.get("bootdevice")
uefi_target = boot_opts.get("uefi_target")
boot_next = boot_opts.get("boot_next")
override_enabled = boot_opts.get("override_enabled")
boot_override_mode = boot_opts.get("boot_override_mode")
bootdevice = boot_opts.get("bootdevice") # UNCERTAIN: 'boot_opts' might be a params dict — check callers
uefi_target = boot_opts.get("uefi_target") # UNCERTAIN: 'boot_opts' might be a params dict — check callers
boot_next = boot_opts.get("boot_next") # UNCERTAIN: 'boot_opts' might be a params dict — check callers
override_enabled = boot_opts.get(
"override_enabled"
) # UNCERTAIN: 'boot_opts' might be a params dict — check callers
boot_override_mode = boot_opts.get(
"boot_override_mode"
) # UNCERTAIN: 'boot_opts' might be a params dict — check callers
if not bootdevice and override_enabled != "Disabled":
return {"ret": False, "msg": "bootdevice option required for temporary boot override"}
@ -3070,16 +3096,16 @@ class RedfishUtils:
@staticmethod
def _insert_virt_media_payload(options, param_map, data, ai):
payload = {"Image": options.get("image_url")}
payload = {"Image": options["image_url"]}
for param, option in param_map.items():
if options.get(option) is not None and param in data:
if options[option] is not None and param in data:
allowable = ai.get(param, {}).get("AllowableValues", [])
if allowable and options.get(option) not in allowable:
if allowable and options[option] not in allowable:
return {
"ret": False,
"msg": f"Value '{options.get(option)}' specified for option '{option}' not in list of AllowableValues {allowable}",
"msg": f"Value '{options[option]}' specified for option '{option}' not in list of AllowableValues {allowable}",
}
payload[param] = options.get(option)
payload[param] = options[option]
return payload
def virtual_media_insert_via_patch(self, options, param_map, uri, data, image_only=False):
@ -3115,10 +3141,10 @@ class RedfishUtils:
"TransferProtocolType": "transfer_protocol_type",
"TransferMethod": "transfer_method",
}
image_url = options.get("image_url")
image_url = options["image_url"]
if not image_url:
return {"ret": False, "msg": "image_url option required for VirtualMediaInsert"}
media_types = options.get("media_types")
media_types = options["media_types"]
# locate and read the VirtualMedia resources
# Given resource_type, use the proper URI
@ -3220,7 +3246,7 @@ class RedfishUtils:
return resp
def virtual_media_eject(self, options, resource_type="Manager"):
image_url = options.get("image_url")
image_url = options["image_url"]
if not image_url:
return {"ret": False, "msg": "image_url option required for VirtualMediaEject"}

View file

@ -214,8 +214,8 @@ class Response:
class Scaleway:
def __init__(self, module: AnsibleModule) -> None:
self.module = module
oauth_token = self.module.params.get("api_token")
scw_profile = self.module.params.get("profile")
oauth_token = self.module.params["api_token"]
scw_profile = self.module.params["profile"]
if scw_profile:
if YAML_IMPORT_ERROR is not None:
@ -243,7 +243,7 @@ class Scaleway:
api_response = results.json if results.json is not None else results.body
message = api_response.get("message") if isinstance(api_response, dict) else api_response
raise ScalewayException(
f"Error fetching {self.name} ({self.module.params.get('api_url')}/{self.name}) [{results.status_code}: {message}]"
f"Error fetching {self.name} ({self.module.params['api_url']}/{self.name}) [{results.status_code}: {message}]"
)
if results.json is None:
@ -252,14 +252,14 @@ class Scaleway:
return results.json.get(self.name)
def _url_builder(self, path, params):
d = self.module.params.get("query_parameters")
d = self.module.params["query_parameters"]
if params is not None:
d.update(params)
query_string = urlencode(d, doseq=True)
if path[0] == "/":
path = path[1:]
return f"{self.module.params.get('api_url')}/{path}?{query_string}"
return f"{self.module.params['api_url']}/{path}?{query_string}"
def send(self, method, path, data=None, headers=None, params=None):
url = self._url_builder(path=path, params=params)
@ -277,7 +277,7 @@ class Scaleway:
data=data,
headers=self.headers,
method=method,
timeout=self.module.params.get("api_timeout"),
timeout=self.module.params["api_timeout"],
)
# Exceptions in fetch_url may result in a status -1, the ensures a proper error to the user in all cases

View file

@ -90,14 +90,14 @@ class UTM:
"""
self.info_only = info_only
self.module = module
self.request_url = f"{module.params.get('utm_protocol')}://{module.params.get('utm_host')}:{module.params.get('utm_port')}/api/objects/{endpoint}/"
self.request_url = f"{module.params['utm_protocol']}://{module.params['utm_host']}:{module.params['utm_port']}/api/objects/{endpoint}/"
"""
The change_relevant_keys will be checked for changes to determine whether the object needs to be updated
"""
self.change_relevant_keys = change_relevant_keys
self.module.params["url_username"] = "token"
self.module.params["url_password"] = module.params.get("utm_token")
self.module.params["url_password"] = module.params["utm_token"]
if all(elem in self.change_relevant_keys for elem in module.params.keys()):
raise UTMModuleConfigurationError(
f"The keys {self.change_relevant_keys} to check are not in the modules keys:\n{list(module.params.keys())}"
@ -106,9 +106,9 @@ class UTM:
def execute(self):
try:
if not self.info_only:
if self.module.params.get("state") == "present":
if self.module.params["state"] == "present":
self._add()
elif self.module.params.get("state") == "absent":
elif self.module.params["state"] == "absent":
self._remove()
else:
self._info()
@ -170,9 +170,9 @@ class UTM:
:return: A combined headers dict
"""
default_headers = {"Accept": "application/json", "Content-type": "application/json"}
if self.module.params.get("headers") is not None:
if self.module.params["headers"] is not None:
result = default_headers.copy()
result.update(self.module.params.get("headers"))
result.update(self.module.params["headers"])
else:
result = default_headers
return result
@ -208,7 +208,7 @@ class UTM:
result = None
if response is not None:
results = json.loads(response.read())
result = next((d for d in results if d["name"] == module.params.get("name")), None)
result = next((d for d in results if d["name"] == module.params["name"]), None)
return info, result
def _clean_result(self, result):
@ -235,4 +235,4 @@ class UTM:
:param result: The result from the query
:return:
"""
return any(module.params.get(key) != result[key] for key in keys)
return any(module.params[key] != result[key] for key in keys)

View file

@ -43,9 +43,9 @@ def get_array(module: AnsibleModule):
"""Return storage array object or fail"""
global VXOS_VERSION
array = module.params["array"]
user = module.params.get("user", None)
password = module.params.get("password", None)
validate = module.params.get("validate_certs")
user = module.params["user"]
password = module.params["password"]
validate = module.params["validate_certs"]
if not HAS_VEXATAPI:
module.fail_json(msg="vexatapi library is required for this module. To install, use `pip install vexatapi`")

View file

@ -91,7 +91,7 @@ from ansible_collections.community.general.plugins.module_utils._pritunl_api imp
def add_pritunl_organization(module):
result = {}
org_name = module.params.get("name")
org_name = module.params["name"]
org_obj_list = list_pritunl_organizations(
**dict_merge(
@ -121,8 +121,8 @@ def add_pritunl_organization(module):
def remove_pritunl_organization(module):
result = {}
org_name = module.params.get("name")
force = module.params.get("force")
org_name = module.params["name"]
force = module.params["force"]
org_obj_list = []
@ -179,7 +179,7 @@ def main():
module = AnsibleModule(argument_spec=argument_spec)
state = module.params.get("state")
state = module.params["state"]
try:
if state == "present":

View file

@ -82,7 +82,7 @@ from ansible_collections.community.general.plugins.module_utils._pritunl_api imp
def get_pritunl_organizations(module):
org_name = module.params.get("organization")
org_name = module.params["organization"]
organizations = []

View file

@ -157,17 +157,17 @@ from ansible_collections.community.general.plugins.module_utils._pritunl_api imp
def add_or_update_pritunl_user(module):
result = {}
org_name = module.params.get("organization")
user_name = module.params.get("user_name")
org_name = module.params["organization"]
user_name = module.params["user_name"]
user_params = {
"name": user_name,
"email": module.params.get("user_email"),
"groups": module.params.get("user_groups"),
"disabled": module.params.get("user_disabled"),
"gravatar": module.params.get("user_gravatar"),
"mac_addresses": module.params.get("user_mac_addresses"),
"type": module.params.get("user_type"),
"email": module.params["user_email"],
"groups": module.params["user_groups"],
"disabled": module.params["user_disabled"],
"gravatar": module.params["user_gravatar"],
"mac_addresses": module.params["user_mac_addresses"],
"type": module.params["user_type"],
}
org_obj_list = list_pritunl_organizations(
@ -251,8 +251,8 @@ def add_or_update_pritunl_user(module):
def remove_pritunl_user(module):
result = {}
org_name = module.params.get("organization")
user_name = module.params.get("user_name")
org_name = module.params["organization"]
user_name = module.params["user_name"]
org_obj_list = []
@ -322,7 +322,7 @@ def main():
module = AnsibleModule(argument_spec=argument_spec)
state = module.params.get("state")
state = module.params["state"]
try:
if state == "present":

View file

@ -101,9 +101,9 @@ from ansible_collections.community.general.plugins.module_utils._pritunl_api imp
def get_pritunl_user(module):
user_name = module.params.get("user_name")
user_type = module.params.get("user_type")
org_name = module.params.get("organization")
user_name = module.params["user_name"]
user_type = module.params["user_type"]
org_name = module.params["organization"]
org_obj_list = []

View file

@ -95,7 +95,7 @@ def get_volume(module, array):
def validate_size(module, err_msg):
size = module.params.get("size", False)
size = module.params["size"]
if not size:
module.fail_json(msg=err_msg)
size = size_to_MiB(size)