1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00

[PR #11329/d549baa5 backport][stable-12] straight up: ruff format (#11330)

straight up: ruff format (#11329)

* straight up: ruff format

* Apply suggestions from code review

(cherry picked from commit d549baa5e1)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2025-12-27 16:29:02 +01:00 committed by GitHub
parent 18c53b8bd1
commit e530d2906a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 438 additions and 396 deletions

View file

@ -11,8 +11,7 @@ Keep in mind that Azure Pipelines does not enforce unique job display names (onl
It is up to pipeline authors to avoid name collisions when deviating from the recommended format.
"""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from __future__ import annotations
import os
import re
@ -24,12 +23,12 @@ def main():
"""Main program entry point."""
source_directory = sys.argv[1]
if '/ansible_collections/' in os.getcwd():
if "/ansible_collections/" in os.getcwd():
output_path = "tests/output"
else:
output_path = "test/results"
destination_directory = os.path.join(output_path, 'coverage')
destination_directory = os.path.join(output_path, "coverage")
if not os.path.exists(destination_directory):
os.makedirs(destination_directory)
@ -38,27 +37,27 @@ def main():
count = 0
for name in os.listdir(source_directory):
match = re.search('^Coverage (?P<attempt>[0-9]+) (?P<label>.+)$', name)
label = match.group('label')
attempt = int(match.group('attempt'))
match = re.search("^Coverage (?P<attempt>[0-9]+) (?P<label>.+)$", name)
label = match.group("label")
attempt = int(match.group("attempt"))
jobs[label] = max(attempt, jobs.get(label, 0))
for label, attempt in jobs.items():
name = 'Coverage {attempt} {label}'.format(label=label, attempt=attempt)
name = "Coverage {attempt} {label}".format(label=label, attempt=attempt)
source = os.path.join(source_directory, name)
source_files = os.listdir(source)
for source_file in source_files:
source_path = os.path.join(source, source_file)
destination_path = os.path.join(destination_directory, source_file + '.' + label)
destination_path = os.path.join(destination_directory, source_file + "." + label)
print('"%s" -> "%s"' % (source_path, destination_path))
shutil.copyfile(source_path, destination_path)
count += 1
print('Coverage file count: %d' % count)
print('##vso[task.setVariable variable=coverageFileCount]%d' % count)
print('##vso[task.setVariable variable=outputPath]%s' % output_path)
print("Coverage file count: %d" % count)
print("##vso[task.setVariable variable=coverageFileCount]%d" % count)
print("##vso[task.setVariable variable=outputPath]%s" % output_path)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -34,8 +34,8 @@ class Args:
def parse_args() -> Args:
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--dry-run', action='store_true')
parser.add_argument('path', type=pathlib.Path)
parser.add_argument("-n", "--dry-run", action="store_true")
parser.add_argument("path", type=pathlib.Path)
args = parser.parse_args()
@ -48,12 +48,14 @@ def parse_args() -> Args:
def process_files(directory: pathlib.Path) -> t.Tuple[CoverageFile, ...]:
processed = []
for file in directory.joinpath('reports').glob('coverage*.xml'):
name = file.stem.replace('coverage=', '')
for file in directory.joinpath("reports").glob("coverage*.xml"):
name = file.stem.replace("coverage=", "")
# Get flags from name
flags = name.replace('-powershell', '').split('=') # Drop '-powershell' suffix
flags = [flag if not flag.startswith('stub') else flag.split('-')[0] for flag in flags] # Remove "-01" from stub files
flags = name.replace("-powershell", "").split("=") # Drop '-powershell' suffix
flags = [
flag if not flag.startswith("stub") else flag.split("-")[0] for flag in flags
] # Remove "-01" from stub files
processed.append(CoverageFile(name, file, flags))
@ -64,14 +66,16 @@ def upload_files(codecov_bin: pathlib.Path, files: t.Tuple[CoverageFile, ...], d
for file in files:
cmd = [
str(codecov_bin),
'--name', file.name,
'--file', str(file.path),
"--name",
file.name,
"--file",
str(file.path),
]
for flag in file.flags:
cmd.extend(['--flags', flag])
cmd.extend(["--flags", flag])
if dry_run:
print(f'DRY-RUN: Would run command: {cmd}')
print(f"DRY-RUN: Would run command: {cmd}")
continue
subprocess.run(cmd, check=True)
@ -79,11 +83,11 @@ def upload_files(codecov_bin: pathlib.Path, files: t.Tuple[CoverageFile, ...], d
def download_file(url: str, dest: pathlib.Path, flags: int, dry_run: bool = False) -> None:
if dry_run:
print(f'DRY-RUN: Would download {url} to {dest} and set mode to {flags:o}')
print(f"DRY-RUN: Would download {url} to {dest} and set mode to {flags:o}")
return
with urllib.request.urlopen(url) as resp:
with dest.open('w+b') as f:
with dest.open("w+b") as f:
# Read data in chunks rather than all at once
shutil.copyfileobj(resp, f, 64 * 1024)
@ -92,14 +96,14 @@ def download_file(url: str, dest: pathlib.Path, flags: int, dry_run: bool = Fals
def main():
args = parse_args()
url = 'https://ansible-ci-files.s3.amazonaws.com/codecov/linux/codecov'
with tempfile.TemporaryDirectory(prefix='codecov-') as tmpdir:
codecov_bin = pathlib.Path(tmpdir) / 'codecov'
url = "https://ansible-ci-files.s3.amazonaws.com/codecov/linux/codecov"
with tempfile.TemporaryDirectory(prefix="codecov-") as tmpdir:
codecov_bin = pathlib.Path(tmpdir) / "codecov"
download_file(url, codecov_bin, 0o755, args.dry_run)
files = process_files(args.path)
upload_files(codecov_bin, files, args.dry_run)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -5,8 +5,7 @@
"""Prepends a relative timestamp to each input line from stdin and writes it to stdout."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from __future__ import annotations
import sys
import time
@ -16,14 +15,14 @@ def main():
"""Main program entry point."""
start = time.time()
sys.stdin.reconfigure(errors='surrogateescape')
sys.stdout.reconfigure(errors='surrogateescape')
sys.stdin.reconfigure(errors="surrogateescape")
sys.stdout.reconfigure(errors="surrogateescape")
for line in sys.stdin:
seconds = time.time() - start
sys.stdout.write('%02d:%02d %s' % (seconds // 60, seconds % 60, line))
sys.stdout.write("%02d:%02d %s" % (seconds // 60, seconds % 60, line))
sys.stdout.flush()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -10,7 +10,7 @@ from ruamel.yaml import YAML
def main() -> None:
yaml = YAML(typ='rt')
yaml = YAML(typ="rt")
yaml.indent(mapping=2, sequence=4, offset=2)
# Load
@ -19,7 +19,7 @@ def main() -> None:
# Dump
sio = StringIO()
yaml.dump(data, sio)
print(sio.getvalue().rstrip('\n'))
print(sio.getvalue().rstrip("\n"))
if __name__ == "__main__":

View file

@ -9,43 +9,49 @@ def callback_results_extractor(outputs_results):
results = []
for result in outputs_results:
differences = []
expected_output = result['test']['expected_output']
stdout_lines = result['stdout_lines']
expected_output = result["test"]["expected_output"]
stdout_lines = result["stdout_lines"]
for i in range(max(len(expected_output), len(stdout_lines))):
line = "line_%s" % (i + 1)
test_line = stdout_lines[i] if i < len(stdout_lines) else None
expected_lines = expected_output[i] if i < len(expected_output) else None
if not isinstance(expected_lines, str) and expected_lines is not None:
if test_line not in expected_lines:
differences.append({
'line': {
'expected_one_of': expected_lines,
'got': test_line,
differences.append(
{
"line": {
"expected_one_of": expected_lines,
"got": test_line,
}
}
})
)
else:
if test_line != expected_lines:
differences.append({
'line': {
'expected': expected_lines,
'got': test_line,
differences.append(
{
"line": {
"expected": expected_lines,
"got": test_line,
}
}
})
results.append({
'name': result['test']['name'],
'output': {
'differences': differences,
'expected': expected_output,
'got': stdout_lines,
},
})
)
results.append(
{
"name": result["test"]["name"],
"output": {
"differences": differences,
"expected": expected_output,
"got": stdout_lines,
},
}
)
return results
class FilterModule:
''' Jinja2 compat filters '''
"""Jinja2 compat filters"""
def filters(self):
return {
'callback_results_extractor': callback_results_extractor,
"callback_results_extractor": callback_results_extractor,
}

View file

@ -11,20 +11,21 @@ from ansible.plugins.action import ActionBase
try:
from ansible.utils.datatag import trust_value as _trust_value
except ImportError:
def _trust_value(input):
return input
class ActionModule(ActionBase):
''' Fail with custom message '''
"""Fail with custom message"""
_requires_connection = False
_VALID_ARGS = frozenset(('msg', 'that'))
_VALID_ARGS = frozenset(("msg", "that"))
def _make_safe(self, text):
# A simple str(text) won't do it since AnsibleUnsafeText is clever :-)
return ''.join(chr(ord(x)) for x in text)
return "".join(chr(ord(x)) for x in text)
def run(self, tmp=None, task_vars=None):
if task_vars is None:
@ -33,18 +34,18 @@ class ActionModule(ActionBase):
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
if 'that' not in self._task.args:
if "that" not in self._task.args:
raise AnsibleError('conditional required in "that" string')
fail_msg = 'Assertion failed'
success_msg = 'All assertions passed'
fail_msg = "Assertion failed"
success_msg = "All assertions passed"
thats = self._task.args['that']
thats = self._task.args["that"]
result['_ansible_verbose_always'] = True
result["_ansible_verbose_always"] = True
for that in thats:
if hasattr(self._templar, 'evaluate_conditional'):
if hasattr(self._templar, "evaluate_conditional"):
trusted_that = _trust_value(that) if _trust_value else that
test_result = self._templar.evaluate_conditional(conditional=trusted_that)
else:
@ -52,14 +53,14 @@ class ActionModule(ActionBase):
cond.when = [str(self._make_safe(that))]
test_result = cond.evaluate_conditional(templar=self._templar, all_vars=task_vars)
if not test_result:
result['failed'] = True
result['evaluated_to'] = test_result
result['assertion'] = that
result["failed"] = True
result["evaluated_to"] = test_result
result["assertion"] = that
result['msg'] = fail_msg
result["msg"] = fail_msg
return result
result['changed'] = False
result['msg'] = success_msg
result["changed"] = False
result["msg"] = success_msg
return result

View file

@ -37,17 +37,19 @@ def main():
info = None
arg_formats = {}
for arg, fmt_spec in p['arg_formats'].items():
func = getattr(fmt, fmt_spec['func'])
for arg, fmt_spec in p["arg_formats"].items():
func = getattr(fmt, fmt_spec["func"])
args = fmt_spec.get("args", [])
arg_formats[arg] = func(*args)
try:
runner = CmdRunner(module, [module.params["cmd"], '--'], arg_formats=arg_formats, path_prefix=module.params["path_prefix"])
runner = CmdRunner(
module, [module.params["cmd"], "--"], arg_formats=arg_formats, path_prefix=module.params["path_prefix"]
)
with runner.context(p['arg_order'], check_mode_skip=p['check_mode_skip']) as ctx:
result = ctx.run(**p['arg_values'])
with runner.context(p["arg_order"], check_mode_skip=p["check_mode_skip"]) as ctx:
result = ctx.run(**p["arg_values"])
info = ctx.run_info
check = "check"
rc, out, err = result if result is not None else (None, None, None)
@ -57,5 +59,5 @@ def main():
module.fail_json(rc=1, module_stderr=traceback.format_exc(), msg="Module crashed with exception")
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -3,4 +3,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# single_app_project/core/settings.py
SECRET_KEY = 'testtesttesttesttest'
SECRET_KEY = "testtesttesttesttest"

View file

@ -11,10 +11,11 @@ import sys
def main():
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'single_app_project.core.settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "single_app_project.core.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -12,7 +12,7 @@ import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'p1.settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "p1.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
@ -24,5 +24,5 @@ def main():
execute_from_command_line(sys.argv)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -27,7 +27,7 @@ BASE_DIR = Path(__file__).resolve().parent.parent
# See https://docs.djangoproject.com/en/3.1/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '%g@gyhl*q@@g(_ab@t^76dao^#b9-v8mw^50)x_bv6wpl+mukj'
SECRET_KEY = "%g@gyhl*q@@g(_ab@t^76dao^#b9-v8mw^50)x_bv6wpl+mukj"
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
@ -38,52 +38,52 @@ ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = 'p1.urls'
ROOT_URLCONF = "p1.urls"
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
WSGI_APPLICATION = 'p1.wsgi.application'
WSGI_APPLICATION = "p1.wsgi.application"
# Database
# https://docs.djangoproject.com/en/3.1/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3",
}
}
@ -93,16 +93,16 @@ DATABASES = {
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]
@ -110,9 +110,9 @@ AUTH_PASSWORD_VALIDATORS = [
# Internationalization
# https://docs.djangoproject.com/en/3.1/topics/i18n/
LANGUAGE_CODE = 'en-us'
LANGUAGE_CODE = "en-us"
TIME_ZONE = 'UTC'
TIME_ZONE = "UTC"
USE_I18N = True
@ -124,8 +124,8 @@ USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.1/howto/static-files/
STATIC_URL = '/static/'
STATIC_ROOT = '/tmp/django-static'
STATIC_URL = "/static/"
STATIC_ROOT = "/tmp/django-static"
if "DJANGO_ANSIBLE_RAISE" in os.environ:
raise ValueError("DJANGO_ANSIBLE_RAISE={0}".format(os.environ["DJANGO_ANSIBLE_RAISE"]))

View file

@ -23,5 +23,5 @@ from django.contrib import admin
from django.urls import path
urlpatterns = [
path('admin/', admin.site.urls),
path("admin/", admin.site.urls),
]

View file

@ -3,4 +3,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# single_app_project/core/settings.py
SECRET_KEY = 'testtesttesttesttest'
SECRET_KEY = "testtesttesttesttest"

View file

@ -11,10 +11,11 @@ import sys
def main():
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'single_app_project.core.settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "single_app_project.core.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -12,7 +12,7 @@ import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'p1.settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "p1.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
@ -24,5 +24,5 @@ def main():
execute_from_command_line(sys.argv)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -27,7 +27,7 @@ BASE_DIR = Path(__file__).resolve().parent.parent
# See https://docs.djangoproject.com/en/3.1/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '%g@gyhl*q@@g(_ab@t^76dao^#b9-v8mw^50)x_bv6wpl+mukj'
SECRET_KEY = "%g@gyhl*q@@g(_ab@t^76dao^#b9-v8mw^50)x_bv6wpl+mukj"
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
@ -38,52 +38,52 @@ ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = 'p1.urls'
ROOT_URLCONF = "p1.urls"
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
WSGI_APPLICATION = 'p1.wsgi.application'
WSGI_APPLICATION = "p1.wsgi.application"
# Database
# https://docs.djangoproject.com/en/3.1/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3",
}
}
@ -93,16 +93,16 @@ DATABASES = {
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]
@ -110,9 +110,9 @@ AUTH_PASSWORD_VALIDATORS = [
# Internationalization
# https://docs.djangoproject.com/en/3.1/topics/i18n/
LANGUAGE_CODE = 'en-us'
LANGUAGE_CODE = "en-us"
TIME_ZONE = 'UTC'
TIME_ZONE = "UTC"
USE_I18N = True
@ -124,8 +124,8 @@ USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.1/howto/static-files/
STATIC_URL = '/static/'
STATIC_ROOT = '/tmp/django-static'
STATIC_URL = "/static/"
STATIC_ROOT = "/tmp/django-static"
if "DJANGO_ANSIBLE_RAISE" in os.environ:
raise ValueError("DJANGO_ANSIBLE_RAISE={0}".format(os.environ["DJANGO_ANSIBLE_RAISE"]))

View file

@ -23,5 +23,5 @@ from django.contrib import admin
from django.urls import path
urlpatterns = [
path('admin/', admin.site.urls),
path("admin/", admin.site.urls),
]

View file

@ -20,7 +20,7 @@ except ImportError:
# Argument parsing
if len(sys.argv) != 4:
print('Syntax: {0} <bind> <port> <path>'.format(sys.argv[0]))
print("Syntax: {0} <bind> <port> <path>".format(sys.argv[0]))
sys.exit(-1)
HOST, PORT, PATH = sys.argv[1:4]
@ -34,16 +34,16 @@ class Handler(SimpleHTTPRequestHandler):
# to support using another base directory than CWD.
# abandon query parameters
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
path = path.split("?", 1)[0]
path = path.split("#", 1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
trailing_slash = path.rstrip().endswith('/')
trailing_slash = path.rstrip().endswith("/")
try:
path = unquote(path, errors='surrogatepass')
path = unquote(path, errors="surrogatepass")
except (UnicodeDecodeError, TypeError) as exc:
path = unquote(path)
path = posixpath.normpath(path)
words = path.split('/')
words = path.split("/")
words = filter(None, words)
path = PATH
for word in words:
@ -52,7 +52,7 @@ class Handler(SimpleHTTPRequestHandler):
continue
path = os.path.join(path, word)
if trailing_slash:
path += '/'
path += "/"
return path

View file

@ -16,15 +16,17 @@ try:
except ModuleNotFoundError:
from http.server import HTTPServer, SimpleHTTPRequestHandler
httpd = HTTPServer(('localhost', port), SimpleHTTPRequestHandler)
httpd = HTTPServer(("localhost", port), SimpleHTTPRequestHandler)
try:
httpd.socket = ssl.wrap_socket(httpd.socket, server_side=True,
certfile=os.path.join(root_dir, 'cert.pem'),
keyfile=os.path.join(root_dir, 'key.pem'))
httpd.socket = ssl.wrap_socket(
httpd.socket,
server_side=True,
certfile=os.path.join(root_dir, "cert.pem"),
keyfile=os.path.join(root_dir, "key.pem"),
)
except AttributeError:
# Python 3.12 or newer:
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=os.path.join(root_dir, 'cert.pem'),
keyfile=os.path.join(root_dir, 'key.pem'))
context.load_cert_chain(certfile=os.path.join(root_dir, "cert.pem"), keyfile=os.path.join(root_dir, "key.pem"))
httpd.socket = context.wrap_socket(httpd.socket)
httpd.handle_request()

View file

@ -10,7 +10,7 @@ import http.server
import socketserver
if __name__ == '__main__':
if __name__ == "__main__":
PORT = int(sys.argv[1])
Handler = http.server.SimpleHTTPRequestHandler
httpd = socketserver.TCPServer(("", PORT), Handler)

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: collection_module
short_description: Test collection module
@ -14,11 +14,11 @@ description:
- This is a test module in a local collection.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: collection_module
short_description: Test collection module
@ -14,11 +14,11 @@ description:
- This is a test module in a local collection.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: collection_module
short_description: Test collection module
@ -14,11 +14,11 @@ description:
- This is a test module in a local collection.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: collection_module
short_description: Test collection module
@ -14,11 +14,11 @@ description:
- This is a test module in a local collection.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: local_module
short_description: Test local module
@ -14,11 +14,11 @@ description:
- This is a test module locally next to a playbook.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -4,11 +4,12 @@
from __future__ import annotations
import lmdb
map_size = 1024 * 100
env = lmdb.open('./jp.mdb', map_size=map_size)
env = lmdb.open("./jp.mdb", map_size=map_size)
with env.begin(write=True) as txn:
txn.put('fr'.encode(), 'France'.encode())
txn.put('nl'.encode(), 'Netherlands'.encode())
txn.put('es'.encode(), 'Spain'.encode())
txn.put('be'.encode(), 'Belgium'.encode())
txn.put('lu'.encode(), 'Luxembourg'.encode())
txn.put("fr".encode(), "France".encode())
txn.put("nl".encode(), "Netherlands".encode())
txn.put("es".encode(), "Spain".encode())
txn.put("be".encode(), "Belgium".encode())
txn.put("lu".encode(), "Luxembourg".encode())

View file

@ -14,17 +14,19 @@ import sys
# Handle TLS and non-TLS support
try:
import smtpd_tls
HAS_TLS = True
except ImportError:
import smtpd
HAS_TLS = False
print('Library smtpd-tls is missing or not supported, hence starttls is NOT supported.')
print("Library smtpd-tls is missing or not supported, hence starttls is NOT supported.")
# Handle custom ports
port = '25:465'
port = "25:465"
if len(sys.argv) > 1:
port = sys.argv[1]
ports = port.split(':')
ports = port.split(":")
if len(ports) > 1:
port1, port2 = int(ports[0]), int(ports[1])
else:
@ -32,30 +34,30 @@ else:
# Handle custom certificate
basename = os.path.splitext(sys.argv[0])[0]
certfile = basename + '.crt'
certfile = basename + ".crt"
if len(sys.argv) > 2:
certfile = sys.argv[2]
# Handle custom key
keyfile = basename + '.key'
keyfile = basename + ".key"
if len(sys.argv) > 3:
keyfile = sys.argv[3]
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
if HAS_TLS and ssl_ctx is not None:
print('Using %s and %s' % (certfile, keyfile))
print("Using %s and %s" % (certfile, keyfile))
ssl_ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
print('Start SMTP server on port', port1)
smtp_server1 = smtpd_tls.DebuggingServer(('127.0.0.1', port1), None, ssl_ctx=ssl_ctx, starttls=True)
print("Start SMTP server on port", port1)
smtp_server1 = smtpd_tls.DebuggingServer(("127.0.0.1", port1), None, ssl_ctx=ssl_ctx, starttls=True)
if port2:
print('Start TLS SMTP server on port', port2)
smtp_server2 = smtpd_tls.DebuggingServer(('127.0.0.1', port2), None, ssl_ctx=ssl_ctx, starttls=False)
print("Start TLS SMTP server on port", port2)
smtp_server2 = smtpd_tls.DebuggingServer(("127.0.0.1", port2), None, ssl_ctx=ssl_ctx, starttls=False)
else:
print('Start SMTP server on port', port1)
smtp_server1 = smtpd.DebuggingServer(('127.0.0.1', port1), None) # pylint: disable=used-before-assignment
print("Start SMTP server on port", port1)
smtp_server1 = smtpd.DebuggingServer(("127.0.0.1", port1), None) # pylint: disable=used-before-assignment
if port2:
print('WARNING: TLS is NOT supported on this system, not listening on port %s.' % port2)
print("WARNING: TLS is NOT supported on this system, not listening on port %s." % port2)
asyncore.loop()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
module: mdepfail
author: "Alexei Znamensky (@russoz)"
short_description: Simple module for testing
@ -22,7 +22,7 @@ options:
c:
description: cccc
type: str
'''
"""
EXAMPLES = ""
@ -36,28 +36,28 @@ with deps.declare("nopackagewiththisname"):
class MSimple(ModuleHelper):
output_params = ('a', 'b', 'c')
output_params = ("a", "b", "c")
module = dict(
argument_spec=dict(
a=dict(type='int'),
b=dict(type='str'),
c=dict(type='str'),
a=dict(type="int"),
b=dict(type="str"),
c=dict(type="str"),
),
)
def __init_module__(self):
self.vars.set('value', None)
self.vars.set('abc', "abc", diff=True)
self.vars.set("value", None)
self.vars.set("abc", "abc", diff=True)
deps.validate(self.module)
def __run__(self):
if (0 if self.vars.a is None else self.vars.a) >= 100:
raise Exception("a >= 100")
if self.vars.c == "abc change":
self.vars['abc'] = "changed abc"
self.vars["abc"] = "changed abc"
if self.vars.a == 2:
self.vars['b'] = str(self.vars.b) * 2
self.vars['c'] = str(self.vars.c) * 2
self.vars["b"] = str(self.vars.b) * 2
self.vars["c"] = str(self.vars.c) * 2
def main():
@ -65,5 +65,5 @@ def main():
msimple.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
module: msimple
author: "Alexei Znamensky (@russoz)"
short_description: Simple module for testing
@ -22,7 +22,7 @@ options:
c:
description: cccc
type: str
'''
"""
EXAMPLES = ""
@ -33,26 +33,26 @@ from ansible_collections.community.general.plugins.module_utils.mh.deco import c
class MSimple(ModuleHelper):
output_params = ('a', 'b', 'c', 'm')
output_params = ("a", "b", "c", "m")
module = dict(
argument_spec=dict(
a=dict(type='int', default=0),
b=dict(type='str'),
c=dict(type='str'),
m=dict(type='str'),
a=dict(type="int", default=0),
b=dict(type="str"),
c=dict(type="str"),
m=dict(type="str"),
),
supports_check_mode=True,
)
def __init_module__(self):
self.vars.set('value', None)
self.vars.set('abc', "abc", diff=True)
self.vars.set("value", None)
self.vars.set("abc", "abc", diff=True)
@check_mode_skip
def process_a3_bc(self):
if self.vars.a == 3:
self.vars['b'] = str(self.vars.b) * 3
self.vars['c'] = str(self.vars.c) * 3
self.vars["b"] = str(self.vars.b) * 3
self.vars["c"] = str(self.vars.c) * 3
def __run__(self):
if self.vars.m:
@ -60,10 +60,10 @@ class MSimple(ModuleHelper):
if self.vars.a >= 100:
raise Exception("a >= 100")
if self.vars.c == "abc change":
self.vars['abc'] = "changed abc"
self.vars["abc"] = "changed abc"
if self.vars.a == 2:
self.vars['b'] = str(self.vars.b) * 2
self.vars['c'] = str(self.vars.c) * 2
self.vars["b"] = str(self.vars.b) * 2
self.vars["c"] = str(self.vars.c) * 2
self.process_a3_bc()
@ -72,5 +72,5 @@ def main():
msimple.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
module: msimpleda
author: "Alexei Znamensky (@russoz)"
short_description: Simple module for testing DeprecationAttrsMixin
@ -16,7 +16,7 @@ options:
a:
description: aaaa
type: int
'''
"""
EXAMPLES = ""
@ -24,15 +24,15 @@ RETURN = ""
from ansible_collections.community.general.plugins.module_utils.module_helper import ModuleHelper
from ansible_collections.community.general.plugins.module_utils.mh.mixins.deprecate_attrs import ( # noqa: F401, pylint: disable=unused-import
DeprecateAttrsMixin
DeprecateAttrsMixin,
)
class MSimpleDA(ModuleHelper):
output_params = ('a',)
output_params = ("a",)
module = dict(
argument_spec=dict(
a=dict(type='int'),
a=dict(type="int"),
),
)
@ -60,5 +60,5 @@ def main():
MSimpleDA.execute()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
module: mstate
author: "Alexei Znamensky (@russoz)"
short_description: State-based module for testing
@ -28,7 +28,7 @@ options:
type: str
choices: [join, b_x_a, c_x_a, both_x_a]
default: join
'''
"""
EXAMPLES = ""
@ -38,30 +38,30 @@ from ansible_collections.community.general.plugins.module_utils.module_helper im
class MState(StateModuleHelper):
output_params = ('a', 'b', 'c', 'state')
output_params = ("a", "b", "c", "state")
module = dict(
argument_spec=dict(
a=dict(type='int', required=True),
b=dict(type='str'),
c=dict(type='str'),
state=dict(type='str', choices=['join', 'b_x_a', 'c_x_a', 'both_x_a', 'nop'], default='join'),
a=dict(type="int", required=True),
b=dict(type="str"),
c=dict(type="str"),
state=dict(type="str", choices=["join", "b_x_a", "c_x_a", "both_x_a", "nop"], default="join"),
),
)
def __init_module__(self):
self.vars.set('result', "abc", diff=True)
self.vars.set("result", "abc", diff=True)
def state_join(self):
self.vars['result'] = "".join([str(self.vars.a), str(self.vars.b), str(self.vars.c)])
self.vars["result"] = "".join([str(self.vars.a), str(self.vars.b), str(self.vars.c)])
def state_b_x_a(self):
self.vars['result'] = str(self.vars.b) * self.vars.a
self.vars["result"] = str(self.vars.b) * self.vars.a
def state_c_x_a(self):
self.vars['result'] = str(self.vars.c) * self.vars.a
self.vars["result"] = str(self.vars.c) * self.vars.a
def state_both_x_a(self):
self.vars['result'] = (str(self.vars.b) + str(self.vars.c)) * self.vars.a
self.vars["result"] = (str(self.vars.b) + str(self.vars.c)) * self.vars.a
def state_nop(self):
pass
@ -72,5 +72,5 @@ def main():
mstate.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -17,9 +17,9 @@ username = sys.argv[3]
password = sys.argv[4]
if username:
url = 'http://%s:%s@127.0.0.1:9001/RPC2' % (quote(username, safe=''), quote(password, safe=''))
url = "http://%s:%s@127.0.0.1:9001/RPC2" % (quote(username, safe=""), quote(password, safe=""))
else:
url = 'http://127.0.0.1:9001/RPC2'
url = "http://127.0.0.1:9001/RPC2"
server = ServerProxy(url, verbose=True)
server.supervisor.sendProcessStdin(proc, 'import sys; print(%s); sys.stdout.flush();\n' % value)
server.supervisor.sendProcessStdin(proc, "import sys; print(%s); sys.stdout.flush();\n" % value)

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: collection_module
short_description: Test collection module
@ -14,11 +14,11 @@ description:
- This is a test module in a local collection.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@
from __future__ import annotations
DOCUMENTATION = '''
DOCUMENTATION = """
---
module: local_module
short_description: Test local module
@ -14,11 +14,11 @@ description:
- This is a test module locally next to a playbook.
author: "Felix Fontein (@felixfontein)"
options: {}
'''
"""
EXAMPLES = ''' # '''
EXAMPLES = """ # """
RETURN = ''' # '''
RETURN = """ # """
from ansible.module_utils.basic import AnsibleModule
@ -27,5 +27,5 @@ def main():
AnsibleModule(argument_spec={}).exit_json()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -3,6 +3,7 @@
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
"""Check extra collection docs with antsibull-docs."""
from __future__ import annotations
import glob
@ -13,16 +14,16 @@ import yaml
def main():
"""Main entry point."""
with open('.azure-pipelines/azure-pipelines.yml', 'rb') as f:
with open(".azure-pipelines/azure-pipelines.yml", "rb") as f:
azp = yaml.safe_load(f)
allowed_targets = set(['azp/generic/1'])
for stage in azp['stages']:
if stage['stage'].startswith(('Sanity', 'Unit', 'Generic', 'Summary')):
allowed_targets = set(["azp/generic/1"])
for stage in azp["stages"]:
if stage["stage"].startswith(("Sanity", "Unit", "Generic", "Summary")):
continue
for job in stage['jobs']:
for group in job['parameters']['groups']:
allowed_targets.add('azp/posix/{0}'.format(group))
for job in stage["jobs"]:
for group in job["parameters"]["groups"]:
allowed_targets.add("azp/posix/{0}".format(group))
paths = glob.glob("tests/integration/targets/*/aliases")
@ -30,40 +31,40 @@ def main():
for path in paths:
targets = []
skip = False
with open(path, 'r') as f:
with open(path, "r") as f:
for line in f:
if '#' in line:
line = line[:line.find('#')]
if "#" in line:
line = line[: line.find("#")]
line = line.strip()
if line.startswith('needs/'):
if line.startswith("needs/"):
continue
if line.startswith('skip/'):
if line.startswith("skip/"):
continue
if line.startswith('cloud/'):
if line.startswith("cloud/"):
continue
if line.startswith('context/'):
if line.startswith("context/"):
continue
if line in ('unsupported', 'disabled', 'hidden'):
if line in ("unsupported", "disabled", "hidden"):
skip = True
if line in ('destructive', ):
if line in ("destructive",):
continue
if '/' not in line:
if "/" not in line:
continue
targets.append(line)
if skip:
continue
if not targets:
if 'targets/setup_' in path:
if "targets/setup_" in path:
continue
print('%s: %s' % (path, 'found no targets'))
print("%s: %s" % (path, "found no targets"))
has_errors = True
for target in targets:
if target not in allowed_targets:
print('%s: %s' % (path, 'found invalid target "{0}"'.format(target)))
print("%s: %s" % (path, 'found invalid target "{0}"'.format(target)))
has_errors = True
return 1 if has_errors else 0
if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main())

View file

@ -17,50 +17,50 @@ from voluptuous.humanize import humanize_error
IGNORE_NO_MAINTAINERS = [
'docs/docsite/rst/filter_guide.rst',
'docs/docsite/rst/filter_guide_abstract_informations.rst',
'docs/docsite/rst/filter_guide_paths.rst',
'docs/docsite/rst/filter_guide_selecting_json_data.rst',
'plugins/cache/memcached.py',
'plugins/cache/redis.py',
'plugins/callback/cgroup_memory_recap.py',
'plugins/callback/context_demo.py',
'plugins/callback/counter_enabled.py',
'plugins/callback/jabber.py',
'plugins/callback/log_plays.py',
'plugins/callback/logdna.py',
'plugins/callback/logentries.py',
'plugins/callback/null.py',
'plugins/callback/selective.py',
'plugins/callback/slack.py',
'plugins/callback/splunk.py',
'plugins/callback/yaml.py',
'plugins/inventory/nmap.py',
'plugins/inventory/virtualbox.py',
'plugins/connection/chroot.py',
'plugins/connection/iocage.py',
'plugins/connection/lxc.py',
'plugins/lookup/cartesian.py',
'plugins/lookup/chef_databag.py',
'plugins/lookup/consul_kv.py',
'plugins/lookup/credstash.py',
'plugins/lookup/cyberarkpassword.py',
'plugins/lookup/flattened.py',
'plugins/lookup/keyring.py',
'plugins/lookup/lastpass.py',
'plugins/lookup/passwordstore.py',
'plugins/lookup/shelvefile.py',
'plugins/filter/json_query.py',
'plugins/filter/random_mac.py',
"docs/docsite/rst/filter_guide.rst",
"docs/docsite/rst/filter_guide_abstract_informations.rst",
"docs/docsite/rst/filter_guide_paths.rst",
"docs/docsite/rst/filter_guide_selecting_json_data.rst",
"plugins/cache/memcached.py",
"plugins/cache/redis.py",
"plugins/callback/cgroup_memory_recap.py",
"plugins/callback/context_demo.py",
"plugins/callback/counter_enabled.py",
"plugins/callback/jabber.py",
"plugins/callback/log_plays.py",
"plugins/callback/logdna.py",
"plugins/callback/logentries.py",
"plugins/callback/null.py",
"plugins/callback/selective.py",
"plugins/callback/slack.py",
"plugins/callback/splunk.py",
"plugins/callback/yaml.py",
"plugins/inventory/nmap.py",
"plugins/inventory/virtualbox.py",
"plugins/connection/chroot.py",
"plugins/connection/iocage.py",
"plugins/connection/lxc.py",
"plugins/lookup/cartesian.py",
"plugins/lookup/chef_databag.py",
"plugins/lookup/consul_kv.py",
"plugins/lookup/credstash.py",
"plugins/lookup/cyberarkpassword.py",
"plugins/lookup/flattened.py",
"plugins/lookup/keyring.py",
"plugins/lookup/lastpass.py",
"plugins/lookup/passwordstore.py",
"plugins/lookup/shelvefile.py",
"plugins/filter/json_query.py",
"plugins/filter/random_mac.py",
]
class BotmetaCheck:
def __init__(self):
self.errors: list[str] = []
self.botmeta_filename = '.github/BOTMETA.yml'
self.list_entries = frozenset(('supershipit', 'maintainers', 'labels', 'keywords', 'notify', 'ignore'))
self.author_regex = re.compile(r'^\w.*\(@([\w-]+)\)(?![\w.])')
self.botmeta_filename = ".github/BOTMETA.yml"
self.list_entries = frozenset(("supershipit", "maintainers", "labels", "keywords", "notify", "ignore"))
self.author_regex = re.compile(r"^\w.*\(@([\w-]+)\)(?![\w.])")
def report_error(self, error: str) -> None:
self.errors.append(error)
@ -70,28 +70,28 @@ class BotmetaCheck:
try:
documentation = []
in_docs = False
with open(filename, 'r', encoding='utf-8') as f:
with open(filename, "r", encoding="utf-8") as f:
for line in f:
if line.startswith('DOCUMENTATION ='):
if line.startswith("DOCUMENTATION ="):
in_docs = True
elif line.startswith(("'''", '"""')) and in_docs:
in_docs = False
elif in_docs:
documentation.append(line)
if in_docs:
self.report_error(f'{filename}: cannot find DOCUMENTATION end')
self.report_error(f"{filename}: cannot find DOCUMENTATION end")
return []
if not documentation:
self.report_error(f'{filename}: cannot find DOCUMENTATION')
self.report_error(f"{filename}: cannot find DOCUMENTATION")
return []
data = yaml.safe_load('\n'.join(documentation))
data = yaml.safe_load("\n".join(documentation))
except Exception as e:
self.report_error(f'{filename}:0:0: Cannot load DOCUMENTATION: {e}')
self.report_error(f"{filename}:0:0: Cannot load DOCUMENTATION: {e}")
return []
author = data.get('author') or []
author = data.get("author") or []
if isinstance(author, str):
author = [author]
return author
@ -100,95 +100,118 @@ class BotmetaCheck:
m = self.author_regex.match(author)
if m:
return m.group(1)
if author == 'Ansible Core Team':
return '$team_ansible_core'
if author == "Ansible Core Team":
return "$team_ansible_core"
return None
def validate(self, filename: str, filedata: dict) -> None:
if not filename.startswith('plugins/'):
if not filename.startswith("plugins/"):
return
if filename.startswith(('plugins/doc_fragments/', 'plugins/module_utils/')):
if filename.startswith(("plugins/doc_fragments/", "plugins/module_utils/")):
return
# Compile list of all active and inactive maintainers
all_maintainers = filedata['maintainers'] + filedata['ignore']
if not filename.startswith(('plugins/action/', 'plugins/doc_fragments/', 'plugins/filter/', 'plugins/module_utils/', 'plugins/plugin_utils/')):
all_maintainers = filedata["maintainers"] + filedata["ignore"]
if not filename.startswith(
(
"plugins/action/",
"plugins/doc_fragments/",
"plugins/filter/",
"plugins/module_utils/",
"plugins/plugin_utils/",
)
):
maintainers = self.read_authors(filename)
for maintainer in maintainers:
maintainer = self.extract_author_name(maintainer)
if maintainer is not None and maintainer not in all_maintainers:
others = ', '.join(all_maintainers)
msg = f'Author {maintainer} not mentioned as active or inactive maintainer for {filename} (mentioned are: {others})'
self.report_error(f'{self.botmeta_filename}:0:0: {msg}')
others = ", ".join(all_maintainers)
msg = f"Author {maintainer} not mentioned as active or inactive maintainer for {filename} (mentioned are: {others})"
self.report_error(f"{self.botmeta_filename}:0:0: {msg}")
should_have_no_maintainer = filename in IGNORE_NO_MAINTAINERS
if not all_maintainers and not should_have_no_maintainer:
self.report_error(f'{self.botmeta_filename}:0:0: No (active or inactive) maintainer mentioned for {filename}')
self.report_error(
f"{self.botmeta_filename}:0:0: No (active or inactive) maintainer mentioned for {filename}"
)
if all_maintainers and should_have_no_maintainer:
own_path = os.path.relpath(__file__, os.getcwd())
self.report_error(f'{self.botmeta_filename}:0:0: Please remove {filename} from the ignore list of {own_path}')
self.report_error(
f"{self.botmeta_filename}:0:0: Please remove {filename} from the ignore list of {own_path}"
)
def run(self) -> None:
try:
with open(self.botmeta_filename, 'rb') as f:
with open(self.botmeta_filename, "rb") as f:
botmeta = yaml.safe_load(f)
except yaml.error.MarkedYAMLError as ex:
msg = re.sub(r'\s+', ' ', str(ex))
self.report_error('f{self.botmeta_filename}:{ex.context_mark.line + 1}:{ex.context_mark.column + 1}: YAML load failed: {msg}')
msg = re.sub(r"\s+", " ", str(ex))
self.report_error(
"f{self.botmeta_filename}:{ex.context_mark.line + 1}:{ex.context_mark.column + 1}: YAML load failed: {msg}"
)
return
except Exception as ex: # pylint: disable=broad-except
msg = re.sub(r'\s+', ' ', str(ex))
self.report_error(f'{self.botmeta_filename}:0:0: YAML load failed: {msg}')
msg = re.sub(r"\s+", " ", str(ex))
self.report_error(f"{self.botmeta_filename}:0:0: YAML load failed: {msg}")
return
# Validate schema
MacroSchema = Schema({
(str): Any(str, None),
}, extra=PREVENT_EXTRA)
FilesSchema = Schema({
(str): {
('supershipit'): str,
('support'): Any('community'),
('maintainers'): str,
('labels'): str,
('keywords'): str,
('notify'): str,
('ignore'): str,
MacroSchema = Schema(
{
(str): Any(str, None),
},
}, extra=PREVENT_EXTRA)
extra=PREVENT_EXTRA,
)
schema = Schema({
('notifications'): bool,
('automerge'): bool,
('macros'): MacroSchema,
('files'): FilesSchema,
}, extra=PREVENT_EXTRA)
FilesSchema = Schema(
{
(str): {
("supershipit"): str,
("support"): Any("community"),
("maintainers"): str,
("labels"): str,
("keywords"): str,
("notify"): str,
("ignore"): str,
},
},
extra=PREVENT_EXTRA,
)
schema = Schema(
{
("notifications"): bool,
("automerge"): bool,
("macros"): MacroSchema,
("files"): FilesSchema,
},
extra=PREVENT_EXTRA,
)
try:
schema(botmeta)
except MultipleInvalid as ex:
for error in ex.errors:
# No way to get line/column numbers
self.report_error(f'{self.botmeta_filename}:0:0: {humanize_error(botmeta, error)}')
self.report_error(f"{self.botmeta_filename}:0:0: {humanize_error(botmeta, error)}")
return
# Preprocess (substitute macros, convert to lists)
macros = botmeta.get('macros') or {}
macro_re = re.compile(r'\$([a-zA-Z_]+)')
macros = botmeta.get("macros") or {}
macro_re = re.compile(r"\$([a-zA-Z_]+)")
def convert_macros(text, macros):
def f(m):
macro = m.group(1)
replacement = (macros[macro] or '')
if macro == 'team_ansible_core':
return f'$team_ansible_core {replacement}'
replacement = macros[macro] or ""
if macro == "team_ansible_core":
return f"$team_ansible_core {replacement}"
return replacement
return macro_re.sub(f, text)
files = {}
try:
for file, filedata in (botmeta.get('files') or {}).items():
for file, filedata in (botmeta.get("files") or {}).items():
file = convert_macros(file, macros)
filedata = {k: convert_macros(v, macros) for k, v in filedata.items()}
files[file] = filedata
@ -196,15 +219,15 @@ class BotmetaCheck:
if k in self.list_entries:
filedata[k] = v.split()
except KeyError as e:
self.report_error(f'{self.botmeta_filename}:0:0: Found unknown macro {e}')
self.report_error(f"{self.botmeta_filename}:0:0: Found unknown macro {e}")
return
# Scan all files
unmatched = set(files)
for dirs in ('docs/docsite/rst', 'plugins', 'tests', 'changelogs'):
for dirs in ("docs/docsite/rst", "plugins", "tests", "changelogs"):
for dirpath, _dirnames, filenames in os.walk(dirs):
for file in sorted(filenames):
if file.endswith('.pyc'):
if file.endswith(".pyc"):
continue
filename = os.path.join(dirpath, file)
if os.path.islink(filename):
@ -217,7 +240,7 @@ class BotmetaCheck:
if file in unmatched:
unmatched.remove(file)
if not matching_files:
self.report_error(f'{self.botmeta_filename}:0:0: Did not find any entry for {filename}')
self.report_error(f"{self.botmeta_filename}:0:0: Did not find any entry for {filename}")
matching_files.sort(key=lambda kv: kv[0])
filedata = {}
@ -231,7 +254,7 @@ class BotmetaCheck:
self.validate(filename, filedata)
for file in unmatched:
self.report_error(f'{self.botmeta_filename}:0:0: Entry {file} was not used')
self.report_error(f"{self.botmeta_filename}:0:0: Entry {file} was not used")
def main() -> int:
@ -243,5 +266,5 @@ def main() -> int:
return 1 if check.errors else 0
if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main())