1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00

Address issues reported by ruff check (#11043)

* Resolve E713 and E714 (not in/is tests).

* Address UP018 (unnecessary str call).

* UP045 requires Python 3.10+.

* Address UP007 (X | Y for type annotations).

* Address UP035 (import Callable from collections.abc).

* Address UP006 (t.Dict -> dict).

* Address UP009 (UTF-8 encoding comment).

* Address UP034 (extraneous parantheses).

* Address SIM910 (dict.get() with None default).

* Address F401 (unused import).

* Address UP020 (use builtin open).

* Address B009 and B010 (getattr/setattr with constant name).

* Address SIM300 (Yoda conditions).

* UP029 isn't in use anyway.

* Address FLY002 (static join).

* Address B034 (re.sub positional args).

* Address B020 (loop variable overrides input).

* Address B017 (assert raise Exception).

* Address SIM211 (if expression with false/true).

* Address SIM113 (enumerate for loop).

* Address UP036 (sys.version_info checks).

* Remove unnecessary UP039.

* Address SIM201 (not ==).

* Address SIM212 (if expr with twisted arms).

* Add changelog fragment.

* Reformat.
This commit is contained in:
Felix Fontein 2025-11-08 05:05:21 +01:00 committed by GitHub
parent f5943201b9
commit 3478863ef0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
77 changed files with 196 additions and 222 deletions

View file

@ -0,0 +1,64 @@
bugfixes:
- timestamp callback plugin - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- wsl connection plugin - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- json_patch filter plugin - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- to_* time filter plugins - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- linode inventory plugin - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- _filelock module utils - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- btrfs module utils - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- module_helper module utils - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- oneandone module utils - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- opennebula module utils - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- ali_instance - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- ali_instance_info - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- apt_rpm - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- btrfs_subvolume - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- consul - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- cronvar - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- dnf_versionlock - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- dnsmadeeasy - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- gitlab_issue - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- gitlab_merge_request - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- gitlab_project - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- gitlab_user - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- haproxy - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- homebrew - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- homebrew_services - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- hpilo_boot - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- ini_file - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- ipa_otptoken - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- jenkins_credential - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- jenkins_plugin - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- kea_command - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- keycloak_authz_permission - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- keycloak_clientscope_type - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- keycloak_user_execute_actions_email - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- keycloak_user_federation - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- listen_ports_facts - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- lxc_container - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- lxd_container - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- matrix - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- maven_artifact - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- monit - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- netcup_dns - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- nmcli - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- nomad_job - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- npm - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- one_host - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- one_image - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- one_template - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- one_vm - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- one_vnet - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- onepassword_info - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- pamd - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- pkgin - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- pulp_repo - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- redhat_subscription - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- rhevm - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- sensu_check - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- simpleinit_msb - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- sorcery - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- terraform - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- timezone - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- xenserver_guest - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).
- zypper_repository - improve Python code (https://github.com/ansible-collections/community.general/pull/11043).

View file

@ -119,5 +119,5 @@ class CallbackModule(Default):
tzinfo = ZoneInfo(self.get_option("timezone"))
# Inject options into the display object
setattr(self._display, "timestamp_tzinfo", tzinfo)
setattr(self._display, "timestamp_format_string", self.get_option("format_string"))
self._display.timestamp_tzinfo = tzinfo
self._display.timestamp_format_string = self.get_option("format_string")

View file

@ -416,7 +416,7 @@ class Connection(ConnectionBase):
sock_kwarg = {}
if proxy_command:
replacers: t.Dict[str, str] = {
replacers: dict[str, str] = {
"%h": self.get_option("remote_addr"),
"%p": str(port),
"%r": self.get_option("remote_user"),
@ -457,7 +457,7 @@ class Connection(ConnectionBase):
paramiko_preferred_pubkeys = getattr(paramiko.Transport, "_preferred_pubkeys", ())
paramiko_preferred_hostkeys = getattr(paramiko.Transport, "_preferred_keys", ())
use_rsa_sha2_algorithms = self.get_option("use_rsa_sha2_algorithms")
disabled_algorithms: t.Dict[str, t.Iterable[str]] = {}
disabled_algorithms: dict[str, t.Iterable[str]] = {}
if not use_rsa_sha2_algorithms:
if paramiko_preferred_pubkeys:
disabled_algorithms["pubkeys"] = tuple(a for a in paramiko_preferred_pubkeys if "rsa-sha2" in a)

View file

@ -10,7 +10,8 @@ from ansible.errors import AnsibleFilterError
if t.TYPE_CHECKING:
from typing import Any, Callable, Union
from typing import Any
from collections.abc import Callable
JSONPATCH_IMPORT_ERROR: ImportError | None
try:
@ -60,7 +61,7 @@ class FilterModule:
def json_patch(
self,
inp: Union[str, list, dict, bytes, bytearray],
inp: str | list | dict | bytes | bytearray,
op: str,
path: str,
value: Any = None,
@ -105,7 +106,7 @@ class FilterModule:
def json_patch_recipe(
self,
inp: Union[str, list, dict, bytes, bytearray],
inp: str | list | dict | bytes | bytearray,
operations: list,
/,
fail_test: bool = False,
@ -141,8 +142,8 @@ class FilterModule:
def json_diff(
self,
inp: Union[str, list, dict, bytes, bytearray],
target: Union[str, list, dict, bytes, bytearray],
inp: str | list | dict | bytes | bytearray,
target: str | list | dict | bytes | bytearray,
) -> list:
if not HAS_LIB:
raise AnsibleFilterError(

View file

@ -56,11 +56,9 @@ def to_time_unit(human_time, unit="ms", **kwargs):
unit = unit_to_short_form.get(unit.rstrip("s"), unit)
if unit not in unit_factors:
raise AnsibleFilterError(
(
f"to_time_unit() can not convert to the following unit: {unit}. Available units (singular or plural):"
f"{', '.join(unit_to_short_form.keys())}. Available short units: {', '.join(unit_factors.keys())}"
)
)
if "year" in kwargs:
unit_factors["y"] = unit_factors["y"][:-1] + [kwargs.pop("year")]

View file

@ -151,7 +151,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
access_token = self.templar.template(variable=access_token)
if access_token is None:
raise AnsibleError(("Could not retrieve Linode access token from plugin configuration sources"))
raise AnsibleError("Could not retrieve Linode access token from plugin configuration sources")
self.client = LinodeClient(access_token)

View file

@ -11,7 +11,6 @@ import os
import stat
import time
import fcntl
import sys
from contextlib import contextmanager
@ -59,9 +58,6 @@ class FileLock:
"""
lock_path = os.path.join(tmpdir, f"ansible-{os.path.basename(path)}.lock")
l_wait = 0.1
r_exception = IOError
if sys.version_info[0] == 3:
r_exception = BlockingIOError
self.lockfd = open(lock_path, "w")
@ -77,7 +73,7 @@ class FileLock:
fcntl.flock(self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
os.chmod(lock_path, stat.S_IWRITE | stat.S_IREAD)
return True
except r_exception:
except BlockingIOError:
time.sleep(l_wait)
e_secs += l_wait
continue

View file

@ -190,7 +190,7 @@ class BtrfsSubvolume:
return mountpoints is not None and len(mountpoints) > 0
def is_filesystem_root(self):
return 5 == self.__subvolume_id
return self.__subvolume_id == 5
def is_filesystem_default(self):
return self.__filesystem.default_subvolid == self.__subvolume_id

View file

@ -7,12 +7,12 @@ from __future__ import annotations
# pylint: disable=unused-import
from ansible_collections.community.general.plugins.module_utils.mh.module_helper import (
from ansible_collections.community.general.plugins.module_utils.mh.module_helper import ( # noqa: F401
ModuleHelper,
StateModuleHelper,
)
from ansible_collections.community.general.plugins.module_utils.mh.exceptions import ModuleHelperException # noqa: F401
from ansible_collections.community.general.plugins.module_utils.mh.deco import (
from ansible_collections.community.general.plugins.module_utils.mh.deco import ( # noqa: F401
cause_changes,
module_fails_on_exception,
check_mode_skip,

View file

@ -38,7 +38,7 @@ def get_resource(oneandone_conn, resource_type, resource_id):
"vpn": oneandone_conn.get_vpn,
}
return switcher.get(resource_type, None)(resource_id)
return switcher.get(resource_type)(resource_id)
def get_datacenter(oneandone_conn, datacenter, full_object=False):

View file

@ -189,7 +189,7 @@ class OpenNebulaModule:
if "cluster_name" in self.module.params:
clusters = self.one.clusterpool.info()
for cluster in clusters.CLUSTER:
if cluster.NAME == self.module.params.get("cluster_name"):
if self.module.params.get("cluster_name") == cluster.NAME:
resolved_params["cluster_id"] = cluster.ID
return resolved_params
@ -223,7 +223,7 @@ class OpenNebulaModule:
"""
hosts = self.one.hostpool.info()
for h in hosts.HOST:
if h.NAME == name:
if name == h.NAME:
return h
return None
@ -238,7 +238,7 @@ class OpenNebulaModule:
clusters = self.one.clusterpool.info()
for c in clusters.CLUSTER:
if c.NAME == name:
if name == c.NAME:
return c
return None
@ -253,7 +253,7 @@ class OpenNebulaModule:
"""
templates = self.one.templatepool.info()
for t in templates.TEMPLATE:
if t.NAME == name:
if name == t.NAME:
return t
return None
@ -305,7 +305,7 @@ class OpenNebulaModule:
intersection[dkey] = current[dkey]
else:
return True
return not (desired == intersection)
return desired != intersection
def wait_for_state(
self,

View file

@ -631,8 +631,8 @@ def get_instances_info(connection, ids):
if len(instances) > 0:
for inst in instances:
volumes = connection.describe_disks(instance_id=inst.id)
setattr(inst, "block_device_mappings", volumes)
setattr(inst, "user_data", inst.describe_user_data())
inst.block_device_mappings = volumes
inst.user_data = inst.describe_user_data()
result.append(inst.read())
return result
@ -748,7 +748,7 @@ def modify_instance(module, instance):
password = module.params["password"]
# userdata can be modified only when instance is stopped
setattr(instance, "user_data", instance.describe_user_data())
instance.user_data = instance.describe_user_data()
user_data = instance.user_data
if state == "stopped":
user_data = module.params["user_data"].encode()

View file

@ -389,8 +389,8 @@ def main():
if not str(inst.instance_name).startswith(name_prefix):
continue
volumes = ecs.describe_disks(instance_id=inst.id)
setattr(inst, "block_device_mappings", volumes)
setattr(inst, "user_data", inst.describe_user_data())
inst.block_device_mappings = volumes
inst.user_data = inst.describe_user_data()
instances.append(inst.read())
instance_ids.append(inst.id)

View file

@ -215,9 +215,9 @@ def update_package_db(module):
def dir_size(module, path):
total_size = 0
for path, dirs, files in os.walk(path):
for cur_path, dirs, files in os.walk(path):
for f in files:
total_size += os.path.getsize(os.path.join(path, f))
total_size += os.path.getsize(os.path.join(cur_path, f))
return total_size

View file

@ -568,7 +568,7 @@ class BtrfsSubvolumeModule:
last = None
ordered = sorted(subvolumes, key=lambda x: x.path)
for next in ordered:
if last is None or not next.path[0 : len(last)] == last:
if last is None or next.path[0 : len(last)] != last:
filtered.append(next)
last = next.path
return filtered

View file

@ -324,7 +324,7 @@ def add_service(module, service):
# there is no way to retrieve the details of checks so if a check is present
# in the service it must be re-registered
if service.has_checks() or not existing or not existing == service:
if service.has_checks() or not existing or existing != service:
service.register(consul_api)
# check that it registered correctly
registered = get_service_by_id_or_name(consul_api, service.id)

View file

@ -171,15 +171,13 @@ class CronVar:
raise CronVarError("Unable to read crontab")
lines = out.splitlines()
count = 0
for l in lines:
for count, l in enumerate(lines):
if count > 2 or (
not re.match(r"# DO NOT EDIT THIS FILE - edit the master and reinstall.", l)
and not re.match(r"# \(/tmp/.*installed on.*\)", l)
and not re.match(r"# \(.*version.*\)", l)
):
self.lines.append(l)
count += 1
def log_message(self, message):
self.module.debug(f'ansible: "{message}"')

View file

@ -139,7 +139,7 @@ NEVRA_RE = re.compile(r"^(?P<name>.+)-(?P<epoch>\d+):(?P<version>.+)-(?P<release
def do_versionlock(module, command, patterns=None, raw=False):
patterns = [] if not patterns else patterns
patterns = patterns if patterns else []
raw_parameter = ["--raw"] if raw else []
# Call dnf versionlock using a just one full NEVR package-name-spec each
# time because multiple package-name-spec and globs are not well supported.

View file

@ -608,7 +608,7 @@ def main():
current_record = DME.getMatchingRecord(record_name, record_type, record_value)
new_record = {"name": record_name}
for i in ["record_value", "record_type", "record_ttl"]:
if not module.params[i] is None:
if module.params[i] is not None:
new_record[i[len("record_") :]] = module.params[i]
# Special handling for mx record
if new_record["type"] == "MX":

View file

@ -271,11 +271,11 @@ class GitlabIssue:
for key, value in options.items():
if value is not None:
if key == "milestone_id":
old_milestone = getattr(issue, "milestone")["id"] if getattr(issue, "milestone") else ""
old_milestone = issue.milestone["id"] if issue.milestone else ""
if value != old_milestone:
return True
elif key == "assignee_ids":
if value != sorted([user["id"] for user in getattr(issue, "assignees")]):
if value != sorted([user["id"] for user in issue.assignees]):
return True
elif key == "labels":

View file

@ -272,11 +272,11 @@ class GitlabMergeRequest:
key = "force_remove_source_branch"
if key == "assignee_ids":
if value != sorted([user["id"] for user in getattr(mr, "assignees")]):
if value != sorted([user["id"] for user in mr.assignees]):
return True
elif key == "reviewer_ids":
if value != sorted([user["id"] for user in getattr(mr, "reviewers")]):
if value != sorted([user["id"] for user in mr.reviewers]):
return True
elif key == "labels":

View file

@ -572,7 +572,7 @@ class GitLabProject:
if all(old_val.get(key) == value for key, value in final_val.items()):
continue
setattr(project, "container_expiration_policy_attributes", final_val)
project.container_expiration_policy_attributes = final_val
else:
setattr(project, arg_key, arg_value)
changed = True

View file

@ -499,8 +499,8 @@ class GitLabUser:
for identity in identities:
if identity not in user.identities:
setattr(user, "provider", identity["provider"])
setattr(user, "extern_uid", identity["extern_uid"])
user.provider = identity["provider"]
user.extern_uid = identity["extern_uid"]
if not self._module.check_mode:
user.save()
changed = True

View file

@ -413,7 +413,7 @@ class HAProxy:
haproxy_version = self.discover_version()
# check if haproxy version supports DRAIN state (starting with 1.5)
if haproxy_version and (1, 5) <= haproxy_version:
if haproxy_version and haproxy_version >= (1, 5):
cmd = "set server $pxname/$svname state drain"
self.execute_for_backends(cmd, backend, host, "DRAIN")
if status == "MAINT":

View file

@ -431,7 +431,7 @@ class Homebrew:
if len(package_names) != 1:
self.failed = True
self.message = (
f"Package names for {name} are missing or ambiguous: {', '.join((str(p) for p in package_names))}"
f"Package names for {name} are missing or ambiguous: {', '.join(str(p) for p in package_names)}"
)
raise HomebrewException(self.message)

View file

@ -87,7 +87,7 @@ running:
"""
import json
import sys
from typing import NamedTuple, Optional
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.homebrew import (
@ -95,25 +95,12 @@ from ansible_collections.community.general.plugins.module_utils.homebrew import
parse_brew_path,
)
if sys.version_info < (3, 5):
from collections import namedtuple
# Stores validated arguments for an instance of an action.
# See DOCUMENTATION string for argument-specific information.
HomebrewServiceArgs = NamedTuple("HomebrewServiceArgs", [("name", str), ("state", str), ("brew_path", str)])
# Stores validated arguments for an instance of an action.
# See DOCUMENTATION string for argument-specific information.
HomebrewServiceArgs = namedtuple("HomebrewServiceArgs", ["name", "state", "brew_path"])
# Stores the state of a Homebrew service.
HomebrewServiceState = namedtuple("HomebrewServiceState", ["running", "pid"])
else:
from typing import NamedTuple, Optional
# Stores validated arguments for an instance of an action.
# See DOCUMENTATION string for argument-specific information.
HomebrewServiceArgs = NamedTuple("HomebrewServiceArgs", [("name", str), ("state", str), ("brew_path", str)])
# Stores the state of a Homebrew service.
HomebrewServiceState = NamedTuple("HomebrewServiceState", [("running", bool), ("pid", Optional[int])])
# Stores the state of a Homebrew service.
HomebrewServiceState = NamedTuple("HomebrewServiceState", [("running", bool), ("pid", Optional[int])])
def _brew_service_state(args, module):

View file

@ -219,7 +219,7 @@ def main():
elif state in ("poweroff"):
power_status = ilo.get_host_power_status()
if not power_status == "OFF":
if power_status != "OFF":
ilo.hold_pwr_btn()
# ilo.set_host_power(host_power=False)
changed = True

View file

@ -254,7 +254,6 @@ EXAMPLES = r"""
state: present
"""
import io
import os
import re
import tempfile
@ -351,7 +350,7 @@ def do_ini(
os.makedirs(destpath)
ini_lines = []
else:
with io.open(target_filename, "r", encoding="utf-8-sig") as ini_file:
with open(target_filename, "r", encoding="utf-8-sig") as ini_file:
ini_lines = [to_text(line) for line in ini_file.readlines()]
if module._diff:

View file

@ -252,7 +252,7 @@ def get_otptoken_dict(
if owner is not None:
otptoken[ansible_to_ipa["owner"]] = owner
if enabled is not None:
otptoken[ansible_to_ipa["enabled"]] = False if enabled else True
otptoken[ansible_to_ipa["enabled"]] = not enabled
if notbefore is not None:
otptoken[ansible_to_ipa["notbefore"]] = f"{notbefore}Z"
if notafter is not None:

View file

@ -455,7 +455,7 @@ def delete_target(module, headers):
)
status = info.get("status", 0)
if not status == 200:
if status != 200:
module.fail_json(msg=f"Failed to delete: HTTP {status}, {response}, {headers}")
except Exception as e:
@ -605,7 +605,7 @@ def run_module():
does_exist = target_exists(module)
# Check if the credential/domain doesn't exist and the user wants to delete
if not does_exist and state == "absent" and not type == "token":
if not does_exist and state == "absent" and type != "token":
result["changed"] = False
result["msg"] = f"{id} does not exist."
module.exit_json(**result)
@ -762,7 +762,7 @@ def run_module():
payload = {"credentials": credentials}
if not type == "file" and not type == "token":
if type != "file" and type != "token":
body = urlencode({"json": json.dumps(payload)})
else: # Delete
@ -770,7 +770,7 @@ def run_module():
module.exit_json(changed=True, msg=f"{id} deleted successfully.")
if not type == "scope" and not scope == "_": # Check if custom scope exists if adding to a custom scope
if type != "scope" and scope != "_": # Check if custom scope exists if adding to a custom scope
if not target_exists(module, True):
module.fail_json(msg=f"Domain {scope} doesn't exists")
@ -781,7 +781,7 @@ def run_module():
status = info.get("status", 0)
if not status == 200:
if status != 200:
body = response.read() if response else b""
module.fail_json(
msg=f"Failed to {'add/update' if state == 'present' else 'delete'} credential",

View file

@ -332,7 +332,6 @@ state:
"""
import hashlib
import io
import json
import os
import tempfile
@ -770,7 +769,7 @@ class JenkinsPlugin:
# Open the updates file
try:
f = io.open(tmp_updates_file, encoding="utf-8")
f = open(tmp_updates_file, encoding="utf-8")
# Read only the second line
dummy = f.readline()

View file

@ -1,5 +1,4 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright © Thorsten Glaser <tglaser@b1-systems.de>

View file

@ -339,7 +339,7 @@ def main():
# defined resource does not include all those scopes.
for scope in scopes:
s = kc.get_authz_authorization_scope_by_name(scope, cid, realm)
if r and not s["id"] in resource_scopes:
if r and s["id"] not in resource_scopes:
module.fail_json(
msg=f"Resource {resources[0]} does not include scope {scope} for client {client_id} in realm {realm}"
)

View file

@ -185,7 +185,7 @@ def clientscopes_to_add(existing, proposed):
to_add = []
existing_clientscope_ids = extract_field(existing, "id")
for clientscope in proposed:
if not clientscope["id"] in existing_clientscope_ids:
if clientscope["id"] not in existing_clientscope_ids:
to_add.append(clientscope)
return to_add
@ -194,7 +194,7 @@ def clientscopes_to_delete(existing, proposed):
to_delete = []
proposed_clientscope_ids = extract_field(proposed, "id")
for clientscope in existing:
if not clientscope["id"] in proposed_clientscope_ids:
if clientscope["id"] not in proposed_clientscope_ids:
to_delete.append(clientscope)
return to_delete

View file

@ -1,5 +1,4 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2025, mariusbertram <marius@brtrm.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)

View file

@ -1044,7 +1044,7 @@ def main():
# we remove all unwanted default mappers
# we use ids so we dont accidently remove one of the previously updated default mapper
for default_mapper in default_mappers:
if not default_mapper["id"] in [x["id"] for x in updated_mappers]:
if default_mapper["id"] not in [x["id"] for x in updated_mappers]:
kc.delete_component(default_mapper["id"], realm)
after_comp["mappers"] = kc.get_components(urlencode(dict(parent=cid)), realm)
@ -1088,7 +1088,7 @@ def main():
for before_mapper in before_comp.get("mappers", []):
# remove unwanted existing mappers that will not be updated
if not before_mapper["id"] in [x["id"] for x in desired_mappers if "id" in x]:
if before_mapper["id"] not in [x["id"] for x in desired_mappers if "id" in x]:
kc.delete_component(before_mapper["id"], realm)
for mapper in desired_mappers:

View file

@ -293,7 +293,7 @@ def ss_parse(raw):
try:
if len(cells) == 6:
# no process column, e.g. due to unprivileged user
process = str()
process = ""
protocol, state, recv_q, send_q, local_addr_port, peer_addr_port = cells
else:
protocol, state, recv_q, send_q, local_addr_port, peer_addr_port, process = cells
@ -312,7 +312,7 @@ def ss_parse(raw):
if pids is None:
# likely unprivileged user, so add empty name & pid
# as we do in netstat logic to be consistent with output
pids = [(str(), 0)]
pids = [("", 0)]
address = conns.group(1)
port = conns.group(2)

View file

@ -857,7 +857,7 @@ class LxcContainerManagement:
if self._container_exists(container_name=self.container_name, lxc_path=self.lxc_path):
return str(self.container.state).lower()
return str("absent")
return "absent"
def _execute_command(self):
"""Execute a shell command."""

View file

@ -714,7 +714,7 @@ class LXDContainerManagement:
if self._needs_to_change_instance_config(param):
if param == "config":
body_json["config"] = body_json.get("config", None) or {}
body_json["config"] = body_json.get("config") or {}
for k, v in self.config["config"].items():
body_json["config"][k] = v
else:

View file

@ -1,5 +1,4 @@
#!/usr/bin/python
# coding: utf-8
# Copyright (c) 2018, Jan Christian Grünhage <jan.christian@gruenhage.xyz>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)

View file

@ -237,7 +237,6 @@ import hashlib
import os
import posixpath
import shutil
import io
import tempfile
import traceback
import re
@ -503,7 +502,7 @@ class MavenDownloader:
if self.local:
parsed_url = urlparse(url)
if os.path.isfile(parsed_url.path):
with io.open(parsed_url.path, "rb") as f:
with open(parsed_url.path, "rb") as f:
return f.read()
if force:
raise ValueError(f"{failmsg} because can not find file: {url}")
@ -630,7 +629,7 @@ class MavenDownloader:
hash = hashlib.sha1()
else:
raise ValueError(f"Unknown checksum_alg {checksum_alg}")
with io.open(file, "rb") as f:
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
hash.update(chunk)
return hash.hexdigest()

View file

@ -149,7 +149,7 @@ class Monit:
:@param validate: Force monit to re-check the status of the process
"""
monit_command = "validate" if validate else "status"
check_rc = False if validate else True # 'validate' always has rc = 1
check_rc = not validate # 'validate' always has rc = 1
command = [self.monit_bin_path, monit_command] + self.command_args + [self.process_name]
rc, out, err = self.module.run_command(command, check_rc=check_rc)
return self._parse_status(out, err)
@ -307,7 +307,7 @@ def main():
present = monit.is_process_present()
if not present and not state == "present":
if not present and state != "present":
module.fail_json(msg=f"{name} process not presently configured with monit", name=name)
if state == "present":

View file

@ -279,7 +279,7 @@ def main():
for r in all_records
if r.hostname == record.hostname
and r.type == record.type
and not r.destination == record.destination
and r.destination != record.destination
]
if obsolete_records:

View file

@ -2878,7 +2878,7 @@ def main():
# team checks
if nmcli.type == "team":
if nmcli.runner_hwaddr_policy and not nmcli.runner == "activebackup":
if nmcli.runner_hwaddr_policy and nmcli.runner != "activebackup":
nmcli.module.fail_json(msg="Runner-hwaddr-policy is only allowed for runner activebackup")
if nmcli.runner_fast_rate is not None and nmcli.runner != "lacp":
nmcli.module.fail_json(msg="runner-fast-rate is only allowed for runner lacp")

View file

@ -163,7 +163,7 @@ def run():
if job_id is None:
module.fail_json(msg="Cannot retrieve job with ID None")
plan = nomad_client.job.plan_job(job_id, job, diff=True)
if not plan["Diff"].get("Type") == "None":
if plan["Diff"].get("Type") != "None":
changed = True
if not module.check_mode:
result = nomad_client.jobs.register_job(job)
@ -186,7 +186,7 @@ def run():
try:
job_id = job_json.get("ID")
plan = nomad_client.job.plan_job(job_id, job, diff=True)
if not plan["Diff"].get("Type") == "None":
if plan["Diff"].get("Type") != "None":
changed = True
if not module.check_mode:
result = nomad_client.jobs.register_job(job)
@ -215,7 +215,7 @@ def run():
result = nomad_client.jobs.register_job(job)
else:
result = nomad_client.validate.validate_job(job)
if not result.status_code == 200:
if result.status_code != 200:
module.fail_json(msg=to_native(result.text))
result = json.loads(result.text)
changed = True
@ -224,7 +224,7 @@ def run():
if module.params.get("state") == "absent":
try:
if not module.params.get("name") is None:
if module.params.get("name") is not None:
job_name = module.params.get("name")
else:
if module.params.get("content_format") == "hcl":

View file

@ -280,7 +280,7 @@ class Npm:
if dep:
# node.js v0.10.22 changed the `npm outdated` module separator
# from "@" to " ". Split on both for backwards compatibility.
pkg, other = re.split(r"\s|@", dep, 1)
pkg, other = re.split(r"\s|@", dep, maxsplit=1)
outdated.append(pkg)
return outdated

View file

@ -282,7 +282,7 @@ class HostModule(OpenNebulaModule):
self.fail(msg=f"Failed to update the host template, ERROR: {e}")
# the cluster
if host.CLUSTER_ID != self.get_parameter("cluster_id"):
if self.get_parameter("cluster_id") != host.CLUSTER_ID:
# returns cluster id in int
try:
one.cluster.addhost(self.get_parameter("cluster_id"), host.ID)

View file

@ -466,10 +466,10 @@ class ImageModule(OpenNebulaModule):
return None
def get_image_by_name(self, image_name):
return self.get_image(lambda image: (image.NAME == image_name))
return self.get_image(lambda image: (image_name == image.NAME))
def get_image_by_id(self, image_id):
return self.get_image(lambda image: (image.ID == image_id))
return self.get_image(lambda image: (image_id == image.ID))
def get_image_instance(self, requested_id, requested_name):
# Using 'if requested_id:' doesn't work properly when requested_id=0
@ -589,7 +589,7 @@ class ImageModule(OpenNebulaModule):
result["changed"] = False
return result
if image.STATE == IMAGE_STATES.index("DISABLED"):
if IMAGE_STATES.index("DISABLED") == image.STATE:
self.module.fail_json(msg="Cannot clone DISABLED image")
if not self.module.check_mode:

View file

@ -226,10 +226,10 @@ class TemplateModule(OpenNebulaModule):
return None
def get_template_by_id(self, template_id, filter):
return self.get_template(lambda template: (template.ID == template_id), filter)
return self.get_template(lambda template: (template_id == template.ID), filter)
def get_template_by_name(self, name, filter):
return self.get_template(lambda template: (template.NAME == name), filter)
return self.get_template(lambda template: (name == template.NAME), filter)
def get_template_instance(self, requested_id, requested_name, filter):
if requested_id:
@ -270,7 +270,7 @@ class TemplateModule(OpenNebulaModule):
result["changed"] = True
else:
# if the previous parsed template data is not equal to the updated one, this has changed
result["changed"] = template.TEMPLATE != result["template"]
result["changed"] = result["template"] != template.TEMPLATE
return result

View file

@ -764,11 +764,11 @@ def get_template(module, client, predicate):
def get_template_by_name(module, client, template_name):
return get_template(module, client, lambda template: (template.NAME == template_name))
return get_template(module, client, lambda template: (template_name == template.NAME))
def get_template_by_id(module, client, template_id):
return get_template(module, client, lambda template: (template.ID == template_id))
return get_template(module, client, lambda template: (template_id == template.ID))
def get_template_id(module, client, requested_id, requested_name):
@ -803,11 +803,11 @@ def get_datastore(module, client, predicate):
def get_datastore_by_name(module, client, datastore_name):
return get_datastore(module, client, lambda datastore: (datastore.NAME == datastore_name))
return get_datastore(module, client, lambda datastore: (datastore_name == datastore.NAME))
def get_datastore_by_id(module, client, datastore_id):
return get_datastore(module, client, lambda datastore: (datastore.ID == datastore_id))
return get_datastore(module, client, lambda datastore: (datastore_id == datastore.ID))
def get_datastore_id(module, client, requested_id, requested_name):
@ -887,7 +887,7 @@ def get_vm_info(client, vm):
# LCM_STATE is VM's sub-state that is relevant only when STATE is ACTIVE
vm_lcm_state = None
if vm.STATE == VM_STATES.index("ACTIVE"):
if VM_STATES.index("ACTIVE") == vm.STATE:
vm_lcm_state = LCM_STATES[vm.LCM_STATE]
vm_labels, vm_attributes = get_vm_labels_and_attributes_dict(client, vm.ID)
@ -1141,7 +1141,7 @@ def get_all_vms_by_attributes(client, attributes_dict, labels_list):
if with_hash and vm.NAME[len(base_name) :].isdigit():
# If the name has indexed format and after base_name it has only digits it'll be matched
vm_list.append(vm)
elif not with_hash and vm.NAME == name:
elif not with_hash and name == vm.NAME:
# If the name is not indexed it has to be same
vm_list.append(vm)
pool = vm_list
@ -1600,7 +1600,7 @@ def disk_save_as(module, client, vm, disk_saveas, wait_timeout):
disk_id = disk_saveas.get("disk_id", 0)
if not module.check_mode:
if vm.STATE != VM_STATES.index("POWEROFF"):
if VM_STATES.index("POWEROFF") != vm.STATE:
module.fail_json(msg="'disksaveas' option can be used only when the VM is in 'POWEROFF' state")
try:
client.vm.disksaveas(vm.ID, disk_id, image_name, "OS", -1)

View file

@ -320,10 +320,10 @@ class NetworksModule(OpenNebulaModule):
return None
def get_template_by_id(self, template_id):
return self.get_template(lambda template: (template.ID == template_id))
return self.get_template(lambda template: (template_id == template.ID))
def get_template_by_name(self, name):
return self.get_template(lambda template: (template.NAME == name))
return self.get_template(lambda template: (name == template.NAME))
def get_template_instance(self, requested_id, requested_name):
if requested_id:
@ -411,7 +411,7 @@ class NetworksModule(OpenNebulaModule):
result["changed"] = True
else:
# if the previous parsed template data is not equal to the updated one, this has changed
result["changed"] = template.TEMPLATE != result["template"]
result["changed"] = result["template"] != template.TEMPLATE
return result

View file

@ -252,8 +252,8 @@ class OnePasswordInfo:
module.fail_json(msg=f"Missing required 'name' field from search term, got: '{term}'")
term["field"] = term.get("field", "password")
term["section"] = term.get("section", None)
term["vault"] = term.get("vault", None)
term["section"] = term.get("section")
term["vault"] = term.get("vault")
processed_terms.append(term)

View file

@ -792,7 +792,7 @@ def main():
("state", "after", ["new_control", "new_type", "new_module_path"]),
],
)
content = str()
content = ""
fname = os.path.join(module.params["path"], module.params["name"])
# Open the file and read the content or fail

View file

@ -274,7 +274,7 @@ def install_packages(module, packages):
rc, out, err = module.run_command(format_pkgin_command(module, "install", package))
if not module.check_mode and not query_package(module, package) in [
if not module.check_mode and query_package(module, package) not in [
PackageState.PRESENT,
PackageState.OUTDATED,
]:

View file

@ -231,7 +231,7 @@ class pulp_server:
if key not in distributor["config"].keys():
return False
if not distributor["config"][key] == value:
if distributor["config"][key] != value:
return False
return True
@ -245,7 +245,7 @@ class pulp_server:
if key not in importer["config"].keys():
return False
if not importer["config"][key] == value:
if importer["config"][key] != value:
return False
return True

View file

@ -901,10 +901,10 @@ class RhsmPool:
return str(self.__getattribute__("_name"))
def get_pool_id(self):
return getattr(self, "PoolId", getattr(self, "PoolID"))
return getattr(self, "PoolId", self.PoolID)
def get_quantity_used(self):
return int(getattr(self, "QuantityUsed"))
return int(self.QuantityUsed)
def subscribe(self):
args = f"subscription-manager attach --pool {self.get_pool_id()}"

View file

@ -1078,14 +1078,13 @@ class RHEV:
def setDisks(self, name, disks):
self.__get_conn()
counter = 0
bootselect = False
for disk in disks:
if "bootable" in disk:
if disk["bootable"] is True:
bootselect = True
for disk in disks:
for counter, disk in enumerate(disks):
diskname = f"{name}_Disk{counter}_{disk.get('name', '').replace('/', '_')}"
disksize = disk.get("size", 1)
diskdomain = disk.get("domain", None)
@ -1110,7 +1109,6 @@ class RHEV:
else:
self.conn.set_Disk(diskname, disksize, diskinterface, diskboot)
checkFail()
counter += 1
return True

View file

@ -275,7 +275,7 @@ def sensu_check(module, path, name, state="present", backup=False):
for k, v in custom_params.items():
if k in config["checks"][name]:
if not config["checks"][name][k] == v:
if config["checks"][name][k] != v:
changed = True
reasons.append(f"`custom param {k}' was changed")
else:
@ -365,7 +365,7 @@ def main():
module = AnsibleModule(argument_spec=arg_spec, required_together=required_together, supports_check_mode=True)
if module.params["state"] != "absent" and module.params["command"] is None:
module.fail_json(msg="missing required arguments: %s" % ",".join(["command"]))
module.fail_json(msg="missing required arguments: command")
path = module.params["path"]
name = module.params["name"]

View file

@ -197,7 +197,7 @@ class SimpleinitMSB:
(rc, out, err) = self.execute_command(f"{self.telinit_cmd} {self.enable}d")
service_enabled = False if self.enable else True
service_enabled = not self.enable
rex = re.compile(rf"^{self.name}$")

View file

@ -736,7 +736,7 @@ def main():
state_msg = "no change in state"
state_changed = False
module.exit_json(changed=state_changed, msg=f"{state_msg}: {'; '.join((x[1] for x in changed.values()))}")
module.exit_json(changed=state_changed, msg=f"{state_msg}: {'; '.join(x[1] for x in changed.values())}")
if __name__ == "__main__":

View file

@ -756,7 +756,7 @@ def main():
cmd=" ".join(command),
)
# checks out to decide if changes were made during execution
if " 0 added, 0 changed" not in out and not state == "absent" or " 0 destroyed" not in out:
if " 0 added, 0 changed" not in out and state != "absent" or " 0 destroyed" not in out:
changed = True
if no_color:

View file

@ -840,7 +840,7 @@ class AIXTimezone(Timezone):
# The best condition check we can do is to check the value of TZ after making the
# change.
TZ = self.__get_timezone()
if TZ != value:
if value != TZ:
msg = f"TZ value does not match post-change (Actual: {TZ}, Expected: {value})."
self.module.fail_json(msg=msg)

View file

@ -802,9 +802,8 @@ class XenServerVM(XenServerObject):
vm_disk_params_list = [
disk_params for disk_params in self.vm_params["VBDs"] if disk_params["type"] == "Disk"
]
position = 0
for disk_change_list in change["disks_changed"]:
for position, disk_change_list in enumerate(change["disks_changed"]):
for disk_change in disk_change_list:
vdi_ref = self.xapi_session.xenapi.VDI.get_by_uuid(
vm_disk_params_list[position]["VDI"]["uuid"]
@ -829,7 +828,6 @@ class XenServerVM(XenServerObject):
),
)
position += 1
elif change.get("disks_new"):
for position, disk_userdevice in change["disks_new"]:
disk_params = self.module.params["disks"][position]

View file

@ -142,7 +142,7 @@ from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.urls import fetch_url
from ansible.module_utils.common.text.converters import to_text
from io import StringIO, open
from io import StringIO
from ansible_collections.community.general.plugins.module_utils.version import LooseVersion

View file

@ -19,59 +19,34 @@ ignore = [
"SIM108", # if-else-block-instead-of-if-exp
# To fix later:
"B905", # zip-without-explicit-strict - needs Python 3.10+
"UP045", # Use `X | None` for type annotations - needs Python 3.10+
# To fix:
"F401", # Unused import
"E721", # Type comparison
"E713", # "not in" test
"E714", # "not is" test
"F841", # Unused variable
"UP006", # Use `dict` instead of `t.Dict` for type annotation
"UP007", # Use `X | Y` for type annotations
"UP009", # UTF-8 encoding declaration is unnecessary
"UP014", # Convert `xxx` from `NamedTuple` functional to class syntax
"UP018", # Unnecessary `str` call (rewrite as a literal)
"UP020", # Use builtin `open`
"UP024", # Replace aliased errors with `OSError`
"UP028", # Replace `yield` over `for` loop with `yield from`
"UP029", # Unnecessary builtin import: `open`
"UP030", # Use implicit references for positional format fields
"UP031", # Use format specifiers instead of percent format
"UP034", # Avoid extraneous parentheses
"UP035", # Import from `collections.abc` instead: `Callable`
"UP036", # Version block is outdated for minimum Python version
"UP039", # Unnecessary parentheses after class definition
"UP041", # Replace aliased errors with `TimeoutError`
"UP045", # Use `X | None` for type annotations
"B007", # unused-loop-control-variable
"B009", # get-attr-with-constant
"B010", # set-attr-with-constant
"B015", # useless-comparison
"B017", # assert-raises-exception
"B020", # loop-variable-overrides-iterator
"B026", # star-arg-unpacking-after-keyword-arg
"B034", # re-sub-positional-args
"B904", # raise-without-from-inside-except
"SIM102", # collapsible-if
"SIM103", # needless-bool
"SIM110", # reimplemented-builtin
"SIM112", # uncapitalized-environment-variables
"SIM113", # enumerate-for-loop
"SIM114", # if-with-same-arms
"SIM115", # open-file-with-context-handler
"SIM116", # if-else-block-instead-of-dict-lookup
"SIM117", # multiple-with-statements
"SIM118", # in-dict-keys
"SIM201", # negate-equal-op
"SIM210", # if-expr-with-true-false
"SIM211", # if-expr-with-false-true
"SIM212", # if-expr-with-twisted-arms
"SIM300", # yoda-conditions
"SIM401", # if-else-block-instead-of-dict-get
"SIM910", # dict-get-with-none-default
"A001", # builtin-variable-shadowing
"A002", # builtin-argument-shadowing
"A004", # builtin-import-shadowing
"FLY002", # static-join-to-f-string
]
# Allow fix for all enabled rules (when `--fix`) is provided.

View file

@ -49,7 +49,7 @@ def lxc(request):
from ansible_collections.community.general.plugins.connection import lxc as lxc_plugin_module
assert lxc_plugin_module.HAS_LIBLXC == liblxc_present
assert liblxc_present == lxc_plugin_module.HAS_LIBLXC
assert bool(getattr(lxc_plugin_module, "_lxc", None)) == liblxc_present
yield lxc_plugin_module

View file

@ -411,15 +411,15 @@ def test_populate(inventory, mocker):
assert group_centos.hosts == [host_gitlab]
# check IPv4 address
assert "172.22.4.187" == host_sam.get_vars()["v4_first_ip"]
assert host_sam.get_vars()["v4_first_ip"] == "172.22.4.187"
# check IPv6 address
assert "2000:a001::b9ff:feae:aa0d" == host_zabbix.get_vars()["v6_first_ip"]
assert host_zabbix.get_vars()["v6_first_ip"] == "2000:a001::b9ff:feae:aa0d"
# check ansible_hosts
assert "172.22.4.187" == host_sam.get_vars()["ansible_host"]
assert "185.165.1.1" == host_zabbix.get_vars()["ansible_host"]
assert "185.165.1.3" == host_gitlab.get_vars()["ansible_host"]
assert host_sam.get_vars()["ansible_host"] == "172.22.4.187"
assert host_zabbix.get_vars()["ansible_host"] == "185.165.1.1"
assert host_gitlab.get_vars()["ansible_host"] == "185.165.1.3"
# check for custom ssh port
assert "8822" == host_gitlab.get_vars()["ansible_port"]
assert host_gitlab.get_vars()["ansible_port"] == "8822"

View file

@ -4,7 +4,6 @@
from __future__ import annotations
import sys
import unittest
from ansible_collections.community.general.plugins.module_utils.hwc_utils import HwcModuleException, navigate_value
@ -14,10 +13,6 @@ class HwcUtilsTestCase(unittest.TestCase):
def setUp(self):
super().setUp()
# Add backward compatibility
if sys.version_info < (3, 0):
self.assertRaisesRegex = self.assertRaisesRegexp
def test_navigate_value(self):
value = {
"foo": {

View file

@ -44,7 +44,7 @@ def test_cause_changes_deco(deco_args, expect_exception, expect_changed):
mh = MockMH()
if expect_exception:
with pytest.raises(Exception):
with pytest.raises(ZeroDivisionError):
mh.div_(1, 0)
else:
mh.div_(9, 3)

View file

@ -32,7 +32,7 @@ def test_op_config(mocker, os_expanduser):
mocker.patch("os.path.exists", side_effect=[False, True])
op_config = OnePasswordConfig()
assert "/home/testuser/.config/op/config" == op_config.config_file_path
assert op_config.config_file_path == "/home/testuser/.config/op/config"
def test_op_no_config(mocker, os_expanduser):

View file

@ -47,7 +47,7 @@ class OneViewBaseTest:
def test_main_function_should_call_run_method(self, testing_module, mock_ansible_module):
mock_ansible_module.params = {"config": "config.json"}
main_func = getattr(testing_module, "main")
main_func = testing_module.main
with patch.object(self.testing_class, "run") as mock_run:
main_func()
@ -125,7 +125,7 @@ class OneViewBaseTestCase:
def test_main_function_should_call_run_method(self):
self.mock_ansible_module.params = {"config": "config.json"}
main_func = getattr(self.testing_module, "main")
main_func = self.testing_module.main
with patch.object(self.testing_class, "run") as mock_run:
main_func()

View file

@ -11,7 +11,6 @@ from ansible_collections.community.general.plugins.modules import interfaces_fil
from shutil import copyfile, move
import difflib
import inspect
import io
import json
import os
import re
@ -81,7 +80,7 @@ class TestInterfacesFileModule(unittest.TestCase):
goldenstring = string
goldenData = ifaces
if not os.path.isfile(testfilepath):
with io.open(testfilepath, "wb") as f:
with open(testfilepath, "wb") as f:
f.write(string.encode())
else:
with open(testfilepath, "r") as goldenfile:
@ -94,7 +93,7 @@ class TestInterfacesFileModule(unittest.TestCase):
string += "\n"
goldenstring = string
if not os.path.isfile(testfilepath):
f = io.open(testfilepath, "wb")
f = open(testfilepath, "wb")
f.write(string.encode())
f.close()
else:

View file

@ -12,17 +12,8 @@ from unittest.mock import (
mock_open,
)
import builtins
import json
import sys
if sys.version_info[0] == 3:
import builtins
open_path = "builtins.open"
else:
import __builtin__ as builtins
open_path = "__builtin__.open"
def test_validate_file_exist_passes_when_file_exists():
@ -251,7 +242,7 @@ def test_read_privateKey_returns_trimmed_contents():
module.params = {"private_key_path": "/fake/path/key.pem"}
mocked_file = mock_open(read_data="\n \t -----BEGIN PRIVATE KEY-----\nKEYDATA\n-----END PRIVATE KEY----- \n\n")
with patch(open_path, mocked_file):
with patch("builtins.open", mocked_file):
result = jenkins_credential.read_privateKey(module)
expected = "-----BEGIN PRIVATE KEY-----\nKEYDATA\n-----END PRIVATE KEY-----"
@ -264,7 +255,7 @@ def test_read_privateKey_handles_file_read_error():
module = MagicMock()
module.params = {"private_key_path": "/invalid/path.pem"}
with patch(open_path, side_effect=IOError("cannot read file")):
with patch("builtins.open", side_effect=IOError("cannot read file")):
jenkins_credential.read_privateKey(module)
module.fail_json.assert_called_once()
@ -295,7 +286,7 @@ def test_embed_file_into_body_fails_when_file_unreadable():
file_path = "/fake/path/missing.pem"
credentials = {"id": "something"}
with patch(open_path, side_effect=IOError("can't read file")):
with patch("builtins.open", side_effect=IOError("can't read file")):
jenkins_credential.embed_file_into_body(module, file_path, credentials)
module.fail_json.assert_called_once()
@ -307,7 +298,7 @@ def test_embed_file_into_body_injects_file_keys_into_credentials():
file_path = "/fake/path/file.txt"
credentials = {"id": "test"}
with patch(open_path, mock_open(read_data=b"1234")), patch("os.path.basename", return_value="file.txt"):
with patch("builtins.open", mock_open(read_data=b"1234")), patch("os.path.basename", return_value="file.txt"):
jenkins_credential.embed_file_into_body(module, file_path, credentials)
assert credentials["file"] == "file0"

View file

@ -4,7 +4,6 @@
from __future__ import annotations
import sys
from unittest.mock import patch
from ansible.module_utils.common.dict_transformations import dict_merge
@ -30,10 +29,6 @@ class TestPritunlOrg(ModuleTestCase):
super().setUp()
self.module = pritunl_org
# Add backward compatibility
if sys.version_info < (3, 2):
self.assertRegex = self.assertRegexpMatches
def tearDown(self):
super().tearDown()

View file

@ -4,7 +4,6 @@
from __future__ import annotations
import sys
from unittest.mock import patch
from ansible_collections.community.general.plugins.modules import (
@ -27,10 +26,6 @@ class TestPritunlOrgInfo(ModuleTestCase):
super().setUp()
self.module = pritunl_org_info
# Add backward compatibility
if sys.version_info < (3, 2):
self.assertRegex = self.assertRegexpMatches
def tearDown(self):
super().tearDown()

View file

@ -4,7 +4,6 @@
from __future__ import annotations
import sys
from unittest.mock import patch
from ansible.module_utils.common.dict_transformations import dict_merge
@ -42,10 +41,6 @@ class TestPritunlUser(ModuleTestCase):
super().setUp()
self.module = pritunl_user
# Add backward compatibility
if sys.version_info < (3, 2):
self.assertRegex = self.assertRegexpMatches
def tearDown(self):
super().tearDown()

View file

@ -4,7 +4,6 @@
from __future__ import annotations
import sys
from unittest.mock import patch
from ansible_collections.community.general.plugins.modules import (
@ -27,10 +26,6 @@ class TestPritunlUserInfo(ModuleTestCase):
super().setUp()
self.module = pritunl_user_info
# Add backward compatibility
if sys.version_info < (3, 2):
self.assertRegex = self.assertRegexpMatches
def tearDown(self):
super().tearDown()