2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 06:25:31 +00:00

Add utility logging functions to isctest.log

Unify the different loggers (conftest, module, test) into a single
interface. Remove the need to select the proper logger by automatically
selecting the most-specific logger currently available.

This also removes the need to use the logger/mlogger fixtures manually
and pass these around. This was especially annoying and unwieldy when
splitting the test cases into functions, because logger had to always be
passed around. Instead, it is now possible to use the
isctest.log.(debug,info,warning,error) functions.
This commit is contained in:
Tom Krizek
2024-02-15 14:47:13 +01:00
parent 52f9e6f557
commit c60975f108
6 changed files with 190 additions and 111 deletions

View File

@@ -208,8 +208,8 @@ assigned port numbers. They're also set as environment variables. These include:
Each module has a separate log which will be saved as pytest.log.txt in the Each module has a separate log which will be saved as pytest.log.txt in the
temporary directory in which the test is executed. This log includes messages temporary directory in which the test is executed. This log includes messages
for this module setup/teardown as well as any logging from the tests using the for this module setup/teardown as well as any logging from the tests. Logging
`logger` fixture. Logging level DEBUG and above will be present in this log. level DEBUG and above will be present in this log.
In general, any log messages using INFO or above will also be printed out In general, any log messages using INFO or above will also be printed out
during pytest execution. In CI, the pytest output is also saved to during pytest execution. In CI, the pytest output is also saved to
@@ -276,7 +276,6 @@ is possible to pass fixtures to the test function by specifying their name as
function arguments. Fixtures are used to provide context to the tests, e.g.: function arguments. Fixtures are used to provide context to the tests, e.g.:
- `ports` is a dictionary with assigned port numbers - `ports` is a dictionary with assigned port numbers
- `logger` is a test-specific logging object
### tests_sh_*.py ### tests_sh_*.py

View File

@@ -10,7 +10,6 @@
# information regarding copyright ownership. # information regarding copyright ownership.
from functools import partial from functools import partial
import logging
import os import os
from pathlib import Path from pathlib import Path
import re import re
@@ -47,7 +46,6 @@ else:
# ----------------------- Globals definition ----------------------------- # ----------------------- Globals definition -----------------------------
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "") XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
FILE_DIR = os.path.abspath(Path(__file__).parent) FILE_DIR = os.path.abspath(Path(__file__).parent)
ENV_RE = re.compile(b"([^=]+)=(.*)") ENV_RE = re.compile(b"([^=]+)=(.*)")
@@ -63,7 +61,6 @@ PRIORITY_TESTS = [
"upforwd/", "upforwd/",
] ]
PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS)) PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
CONFTEST_LOGGER = logging.getLogger("conftest")
SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system" SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system"
SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py") SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
@@ -71,38 +68,6 @@ SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
# ---------------------- Module initialization --------------------------- # ---------------------- Module initialization ---------------------------
def init_pytest_conftest_logger(conftest_logger):
"""
This initializes the conftest logger which is used for pytest setup
and configuration before tests are executed -- aka any logging in this
file that is _not_ module-specific.
"""
conftest_logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler("pytest.conftest.log.txt")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
conftest_logger.addHandler(file_handler)
init_pytest_conftest_logger(CONFTEST_LOGGER)
def avoid_duplicated_logs():
"""
Remove direct root logger output to file descriptors.
This default is causing duplicates because all our messages go through
regular logging as well and are thus displayed twice.
"""
todel = []
for handler in logging.root.handlers:
if handler.__class__ == logging.StreamHandler:
# Beware: As for pytest 7.2.2, LiveLogging and LogCapture
# handlers inherit from logging.StreamHandler
todel.append(handler)
for handler in todel:
logging.root.handlers.remove(handler)
def parse_env(env_bytes): def parse_env(env_bytes):
"""Parse the POSIX env format into Python dictionary.""" """Parse the POSIX env format into Python dictionary."""
out = {} out = {}
@@ -127,7 +92,7 @@ def get_env_bytes(cmd):
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
) )
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
CONFTEST_LOGGER.error("failed to get shell env: %s", exc) isctest.log.error("failed to get shell env: %s", exc)
raise exc raise exc
env_bytes = proc.stdout env_bytes = proc.stdout
return parse_env(env_bytes) return parse_env(env_bytes)
@@ -137,7 +102,7 @@ def get_env_bytes(cmd):
# FUTURE: Remove conf.sh entirely and define all variables in pytest only. # FUTURE: Remove conf.sh entirely and define all variables in pytest only.
CONF_ENV = get_env_bytes(". ./conf.sh && env") CONF_ENV = get_env_bytes(". ./conf.sh && env")
os.environb.update(CONF_ENV) os.environb.update(CONF_ENV)
CONFTEST_LOGGER.debug("variables in env: %s", ", ".join([str(key) for key in CONF_ENV])) isctest.log.debug("variables in env: %s", ", ".join([str(key) for key in CONF_ENV]))
# --------------------------- pytest hooks ------------------------------- # --------------------------- pytest hooks -------------------------------
@@ -161,7 +126,7 @@ def pytest_configure(config):
try: try:
import xdist.scheduler.loadscope # pylint: disable=unused-import import xdist.scheduler.loadscope # pylint: disable=unused-import
except ImportError: except ImportError:
CONFTEST_LOGGER.debug( isctest.log.debug(
"xdist is too old and does not have " "xdist is too old and does not have "
"scheduler.loadscope, disabling parallelism" "scheduler.loadscope, disabling parallelism"
) )
@@ -181,7 +146,7 @@ def pytest_ignore_collect(path):
# is otherwise and invalid character for a system test name. # is otherwise and invalid character for a system test name.
match = SYSTEM_TEST_NAME_RE.search(str(path)) match = SYSTEM_TEST_NAME_RE.search(str(path))
if match is None: if match is None:
CONFTEST_LOGGER.warning("unexpected test path: %s (ignored)", path) isctest.log.warning("unexpected test path: %s (ignored)", path)
return True return True
system_test_name = match.groups()[0] system_test_name = match.groups()[0]
return "_" in system_test_name return "_" in system_test_name
@@ -349,13 +314,6 @@ def system_test_name(request):
return path.parent.name return path.parent.name
@pytest.fixture(scope="module")
def mlogger(system_test_name):
"""Logging facility specific to this test module."""
avoid_duplicated_logs()
return logging.getLogger(system_test_name)
def _get_marker(node, marker): def _get_marker(node, marker):
try: try:
# pytest >= 4.x # pytest >= 4.x
@@ -377,15 +335,17 @@ def wait_for_zones_loaded(request, servers):
watcher.wait_for_line("all zones loaded") watcher.wait_for_line("all zones loaded")
@pytest.fixture @pytest.fixture(autouse=True)
def logger(request, system_test_name): def logger(request, system_test_name):
"""Logging facility specific to a particular test.""" """Sets up logging facility specific to a particular test."""
return logging.getLogger(f"{system_test_name}.{request.node.name}") isctest.log.init_test_logger(system_test_name, request.node.name)
yield
isctest.log.deinit_test_logger()
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def system_test_dir( def system_test_dir(
request, env, system_test_name, mlogger request, env, system_test_name
): # pylint: disable=too-many-statements,too-many-locals ): # pylint: disable=too-many-statements,too-many-locals
""" """
Temporary directory for executing the test. Temporary directory for executing the test.
@@ -405,7 +365,7 @@ def system_test_dir(
except AttributeError: except AttributeError:
# This may happen if pytest execution is interrupted and # This may happen if pytest execution is interrupted and
# pytest_runtest_makereport() is never called. # pytest_runtest_makereport() is never called.
mlogger.debug("can't obtain test results, test run was interrupted") isctest.log.debug("can't obtain test results, test run was interrupted")
return "error" return "error"
test_results = { test_results = {
node.nodeid: all_test_results[node.nodeid] node.nodeid: all_test_results[node.nodeid]
@@ -415,10 +375,10 @@ def system_test_dir(
assert len(test_results) assert len(test_results)
messages = [] messages = []
for node, result in test_results.items(): for node, result in test_results.items():
mlogger.debug("%s %s", result.outcome.upper(), node) isctest.log.debug("%s %s", result.outcome.upper(), node)
messages.extend(result.messages) messages.extend(result.messages)
for message in messages: for message in messages:
mlogger.debug("\n" + message) isctest.log.debug("\n" + message)
failed = any(res.outcome == "failed" for res in test_results.values()) failed = any(res.outcome == "failed" for res in test_results.values())
skipped = any(res.outcome == "skipped" for res in test_results.values()) skipped = any(res.outcome == "skipped" for res in test_results.values())
if failed: if failed:
@@ -448,60 +408,54 @@ def system_test_dir(
unlink(symlink_dst) unlink(symlink_dst)
symlink_dst.symlink_to(os.path.relpath(testdir, start=system_test_root)) symlink_dst.symlink_to(os.path.relpath(testdir, start=system_test_root))
# Configure logger to write to a file inside the temporary test directory isctest.log.init_module_logger(system_test_name, testdir)
mlogger.handlers.clear()
mlogger.setLevel(logging.DEBUG)
handler = logging.FileHandler(testdir / "pytest.log.txt", mode="w")
formatter = logging.Formatter(LOG_FORMAT)
handler.setFormatter(formatter)
mlogger.addHandler(handler)
# System tests are meant to be executed from their directory - switch to it. # System tests are meant to be executed from their directory - switch to it.
old_cwd = os.getcwd() old_cwd = os.getcwd()
os.chdir(testdir) os.chdir(testdir)
mlogger.debug("switching to tmpdir: %s", testdir) isctest.log.info("switching to tmpdir: %s", testdir)
try: try:
yield testdir # other fixtures / tests will execute here yield testdir # other fixtures / tests will execute here
finally: finally:
os.chdir(old_cwd) os.chdir(old_cwd)
mlogger.debug("changed workdir to: %s", old_cwd) isctest.log.debug("changed workdir to: %s", old_cwd)
result = get_test_result() result = get_test_result()
# Clean temporary dir unless it should be kept # Clean temporary dir unless it should be kept
keep = False keep = False
if request.config.getoption("--noclean"): if request.config.getoption("--noclean"):
mlogger.debug( isctest.log.debug(
"--noclean requested, keeping temporary directory %s", testdir "--noclean requested, keeping temporary directory %s", testdir
) )
keep = True keep = True
elif result == "failed": elif result == "failed":
mlogger.debug( isctest.log.debug(
"test failure detected, keeping temporary directory %s", testdir "test failure detected, keeping temporary directory %s", testdir
) )
keep = True keep = True
elif not request.node.stash[FIXTURE_OK]: elif not request.node.stash[FIXTURE_OK]:
mlogger.debug( isctest.log.debug(
"test setup/teardown issue detected, keeping temporary directory %s", "test setup/teardown issue detected, keeping temporary directory %s",
testdir, testdir,
) )
keep = True keep = True
if keep: if keep:
mlogger.info( isctest.log.info(
"test artifacts in: %s", symlink_dst.relative_to(system_test_root) "test artifacts in: %s", symlink_dst.relative_to(system_test_root)
) )
else: else:
mlogger.debug("deleting temporary directory") isctest.log.debug("deleting temporary directory")
handler.flush()
handler.close() isctest.log.deinit_module_logger()
if not keep:
shutil.rmtree(testdir) shutil.rmtree(testdir)
unlink(symlink_dst) unlink(symlink_dst)
def _run_script( # pylint: disable=too-many-arguments def _run_script( # pylint: disable=too-many-arguments
env, env,
mlogger,
system_test_dir: Path, system_test_dir: Path,
interpreter: str, interpreter: str,
script: str, script: str,
@@ -518,8 +472,8 @@ def _run_script( # pylint: disable=too-many-arguments
cwd = os.getcwd() cwd = os.getcwd()
if not path.exists(): if not path.exists():
raise FileNotFoundError(f"script {script} not found in {cwd}") raise FileNotFoundError(f"script {script} not found in {cwd}")
mlogger.debug("running script: %s %s %s", interpreter, script, " ".join(args)) isctest.log.debug("running script: %s %s %s", interpreter, script, " ".join(args))
mlogger.debug(" workdir: %s", cwd) isctest.log.debug(" workdir: %s", cwd)
returncode = 1 returncode = 1
cmd = [interpreter, script] + args cmd = [interpreter, script] + args
@@ -534,24 +488,24 @@ def _run_script( # pylint: disable=too-many-arguments
) as proc: ) as proc:
if proc.stdout: if proc.stdout:
for line in proc.stdout: for line in proc.stdout:
mlogger.info(" %s", line.rstrip("\n")) isctest.log.info(" %s", line.rstrip("\n"))
proc.communicate() proc.communicate()
returncode = proc.returncode returncode = proc.returncode
if returncode: if returncode:
raise subprocess.CalledProcessError(returncode, cmd) raise subprocess.CalledProcessError(returncode, cmd)
mlogger.debug(" exited with %d", returncode) isctest.log.debug(" exited with %d", returncode)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def shell(env, system_test_dir, mlogger): def shell(env, system_test_dir):
"""Function to call a shell script with arguments.""" """Function to call a shell script with arguments."""
return partial(_run_script, env, mlogger, system_test_dir, env["SHELL"]) return partial(_run_script, env, system_test_dir, env["SHELL"])
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def perl(env, system_test_dir, mlogger): def perl(env, system_test_dir):
"""Function to call a perl script with arguments.""" """Function to call a perl script with arguments."""
return partial(_run_script, env, mlogger, system_test_dir, env["PERL"]) return partial(_run_script, env, system_test_dir, env["PERL"])
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@@ -568,7 +522,6 @@ def run_tests_sh(system_test_dir, shell):
def system_test( # pylint: disable=too-many-arguments,too-many-statements def system_test( # pylint: disable=too-many-arguments,too-many-statements
request, request,
env: Dict[str, str], env: Dict[str, str],
mlogger,
system_test_dir, system_test_dir,
shell, shell,
perl, perl,
@@ -599,7 +552,7 @@ def system_test( # pylint: disable=too-many-arguments,too-many-statements
try: try:
perl("testsock.pl", ["-p", env["PORT"]]) perl("testsock.pl", ["-p", env["PORT"]])
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
mlogger.error("testsock.pl: exited with code %d", exc.returncode) isctest.log.error("testsock.pl: exited with code %d", exc.returncode)
pytest.skip("Network interface aliases not set up.") pytest.skip("Network interface aliases not set up.")
def check_prerequisites(): def check_prerequisites():
@@ -616,21 +569,21 @@ def system_test( # pylint: disable=too-many-arguments,too-many-statements
except FileNotFoundError: except FileNotFoundError:
pass # setup.sh is optional pass # setup.sh is optional
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
mlogger.error("Failed to run test setup") isctest.log.error("Failed to run test setup")
pytest.fail(f"setup.sh exited with {exc.returncode}") pytest.fail(f"setup.sh exited with {exc.returncode}")
def start_servers(): def start_servers():
try: try:
perl("start.pl", ["--port", env["PORT"], system_test_dir.name]) perl("start.pl", ["--port", env["PORT"], system_test_dir.name])
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
mlogger.error("Failed to start servers") isctest.log.error("Failed to start servers")
pytest.fail(f"start.pl exited with {exc.returncode}") pytest.fail(f"start.pl exited with {exc.returncode}")
def stop_servers(): def stop_servers():
try: try:
perl("stop.pl", [system_test_dir.name]) perl("stop.pl", [system_test_dir.name])
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
mlogger.error("Failed to stop servers") isctest.log.error("Failed to stop servers")
get_core_dumps() get_core_dumps()
pytest.fail(f"stop.pl exited with {exc.returncode}") pytest.fail(f"stop.pl exited with {exc.returncode}")
@@ -638,13 +591,13 @@ def system_test( # pylint: disable=too-many-arguments,too-many-statements
try: try:
shell("get_core_dumps.sh", [system_test_dir.name]) shell("get_core_dumps.sh", [system_test_dir.name])
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
mlogger.error("Found core dumps or sanitizer reports") isctest.log.error("Found core dumps or sanitizer reports")
pytest.fail(f"get_core_dumps.sh exited with {exc.returncode}") pytest.fail(f"get_core_dumps.sh exited with {exc.returncode}")
os.environ.update(env) # Ensure pytests have the same env vars as shell tests. os.environ.update(env) # Ensure pytests have the same env vars as shell tests.
mlogger.info(f"test started: {request.node.name}") isctest.log.info(f"test started: {request.node.name}")
port = int(env["PORT"]) port = int(env["PORT"])
mlogger.info("using port range: <%d, %d>", port, port + PORTS_PER_TEST - 1) isctest.log.info("using port range: <%d, %d>", port, port + PORTS_PER_TEST - 1)
if not hasattr(request.node, "stash"): # compatibility with pytest<7.0.0 if not hasattr(request.node, "stash"): # compatibility with pytest<7.0.0
request.node.stash = {} # use regular dict instead of pytest.Stash request.node.stash = {} # use regular dict instead of pytest.Stash
@@ -662,17 +615,17 @@ def system_test( # pylint: disable=too-many-arguments,too-many-statements
setup_test() setup_test()
try: try:
start_servers() start_servers()
mlogger.debug("executing test(s)") isctest.log.debug("executing test(s)")
yield yield
finally: finally:
mlogger.debug("test(s) finished") isctest.log.debug("test(s) finished")
stop_servers() stop_servers()
get_core_dumps() get_core_dumps()
request.node.stash[FIXTURE_OK] = True request.node.stash[FIXTURE_OK] = True
@pytest.fixture @pytest.fixture
def servers(ports, logger, system_test_dir): def servers(ports, system_test_dir):
instances = {} instances = {}
for entry in system_test_dir.rglob("*"): for entry in system_test_dir.rglob("*"):
if entry.is_dir(): if entry.is_dir():
@@ -682,7 +635,7 @@ def servers(ports, logger, system_test_dir):
named_ports = isctest.instance.NamedPorts( named_ports = isctest.instance.NamedPorts(
dns=int(ports["PORT"]), rndc=int(ports["CONTROLPORT"]) dns=int(ports["PORT"]), rndc=int(ports["CONTROLPORT"])
) )
instance = isctest.instance.NamedInstance(dir_name, named_ports, logger) instance = isctest.instance.NamedInstance(dir_name, named_ports)
instances[dir_name] = instance instances[dir_name] = instance
except ValueError: except ValueError:
continue continue

View File

@@ -18,7 +18,7 @@ import os
import re import re
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .log import LogFile, WatchLogFromStart, WatchLogFromHere from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere
class NamedPorts(NamedTuple): class NamedPorts(NamedTuple):
@@ -64,7 +64,7 @@ class NamedInstance:
self.ports = ports self.ports = ports
self.log = LogFile(os.path.join(identifier, "named.run")) self.log = LogFile(os.path.join(identifier, "named.run"))
self._rndc_executor = rndc_executor or RNDCBinaryExecutor() self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger or logging.getLogger() self._rndc_logger = rndc_logger
@staticmethod @staticmethod
def _identifier_to_ip(identifier: str) -> str: def _identifier_to_ip(identifier: str) -> str:
@@ -156,12 +156,13 @@ class NamedInstance:
current working directory. current working directory.
""" """
fmt = '%(ip)s: "%(command)s"\n%(separator)s\n%(response)s%(separator)s' fmt = '%(ip)s: "%(command)s"\n%(separator)s\n%(response)s%(separator)s'
self._rndc_logger.info( args = {
fmt, "ip": self.ip,
{ "command": command,
"ip": self.ip, "separator": "-" * 80,
"command": command, "response": response,
"separator": "-" * 80, }
"response": response, if self._rndc_logger is None:
}, info(fmt, args)
) else:
self._rndc_logger.info(fmt, args)

View File

@@ -9,4 +9,16 @@
# See the COPYRIGHT file distributed with this work for additional # See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership. # information regarding copyright ownership.
from .basic import (
deinit_module_logger,
deinit_test_logger,
init_module_logger,
init_test_logger,
debug,
info,
warning,
error,
critical,
)
from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere

View File

@@ -0,0 +1,113 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import logging
from pathlib import Path
from typing import Dict, Optional
CONFTEST_LOGGER = logging.getLogger("conftest")
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
LOGGERS = {
"conftest": None,
"module": None,
"test": None,
} # type: Dict[str, Optional[logging.Logger]]
def init_conftest_logger():
"""
This initializes the conftest logger which is used for pytest setup
and configuration before tests are executed -- aka any logging in this
file that is _not_ module-specific.
"""
LOGGERS["conftest"] = logging.getLogger("conftest")
LOGGERS["conftest"].setLevel(logging.DEBUG)
file_handler = logging.FileHandler("pytest.conftest.log.txt")
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
LOGGERS["conftest"].addHandler(file_handler)
def avoid_duplicated_logs():
"""
Remove direct root logger output to file descriptors.
This default is causing duplicates because all our messages go through
regular logging as well and are thus displayed twice.
"""
todel = []
for handler in logging.root.handlers:
if handler.__class__ == logging.StreamHandler:
# Beware: As for pytest 7.2.2, LiveLogging and LogCapture
# handlers inherit from logging.StreamHandler
todel.append(handler)
for handler in todel:
logging.root.handlers.remove(handler)
init_conftest_logger()
avoid_duplicated_logs()
def init_module_logger(system_test_name: str, testdir: Path):
logger = logging.getLogger(system_test_name)
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(testdir / "pytest.log.txt", mode="w")
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(handler)
LOGGERS["module"] = logger
def deinit_module_logger():
for handler in LOGGERS["module"].handlers:
handler.flush()
handler.close()
LOGGERS["module"] = None
def init_test_logger(system_test_name: str, test_name: str):
LOGGERS["test"] = logging.getLogger(f"{system_test_name}.{test_name}")
def deinit_test_logger():
LOGGERS["test"] = None
def log(lvl: int, msg: str, *args, **kwargs):
"""Log message with the most-specific logger currently available."""
logger = LOGGERS["test"]
if logger is None:
logger = LOGGERS["module"]
if logger is None:
logger = LOGGERS["conftest"]
assert logger is not None
logger.log(lvl, msg, *args, **kwargs)
def debug(msg: str, *args, **kwargs):
log(logging.DEBUG, msg, *args, **kwargs)
def info(msg: str, *args, **kwargs):
log(logging.INFO, msg, *args, **kwargs)
def warning(msg: str, *args, **kwargs):
log(logging.WARNING, msg, *args, **kwargs)
def error(msg: str, *args, **kwargs):
log(logging.ERROR, msg, *args, **kwargs)
def critical(msg: str, *args, **kwargs):
log(logging.CRITICAL, msg, *args, **kwargs)

View File

@@ -13,9 +13,12 @@ import concurrent.futures
import os import os
import subprocess import subprocess
import time import time
import dns.query import dns.query
import dns.update import dns.update
import isctest
def rndc_loop(test_state, server): def rndc_loop(test_state, server):
rndc = os.getenv("RNDC") rndc = os.getenv("RNDC")
@@ -37,7 +40,7 @@ def rndc_loop(test_state, server):
time.sleep(1) time.sleep(1)
def update_zone(test_state, zone, named_port, logger): def update_zone(test_state, zone, named_port):
server = "10.53.0.2" server = "10.53.0.2"
for i in range(1000): for i in range(1000):
if test_state["finished"]: if test_state["finished"]:
@@ -48,13 +51,13 @@ def update_zone(test_state, zone, named_port, logger):
response = dns.query.udp(update, server, 10, named_port) response = dns.query.udp(update, server, 10, named_port)
assert response.rcode() == dns.rcode.NOERROR assert response.rcode() == dns.rcode.NOERROR
except dns.exception.Timeout: except dns.exception.Timeout:
logger.info(f"error: query timeout for {zone}") isctest.log.info(f"error: query timeout for {zone}")
logger.info(f"Update of {server} zone {zone} successful") isctest.log.info(f"Update of {server} zone {zone} successful")
# If the test has run to completion without named crashing, it has succeeded. # If the test has run to completion without named crashing, it has succeeded.
def test_update_stress(named_port, logger): def test_update_stress(named_port):
test_state = {"finished": False} test_state = {"finished": False}
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
@@ -63,9 +66,7 @@ def test_update_stress(named_port, logger):
updaters = [] updaters = []
for i in range(5): for i in range(5):
zone = f"zone00000{i}.example." zone = f"zone00000{i}.example."
updaters.append( updaters.append(executor.submit(update_zone, test_state, zone, named_port))
executor.submit(update_zone, test_state, zone, named_port, logger)
)
# All the update_zone() tasks are expected to complete within 5 # All the update_zone() tasks are expected to complete within 5
# minutes. If they do not, we cannot assert immediately as that will # minutes. If they do not, we cannot assert immediately as that will