2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-29 21:47:59 +00:00

chg: test: Rewrite stub system test to pytest

Merge branch 'mnowak/pytest_rewrite_stub' into 'main'

See merge request isc-projects/bind9!9190
This commit is contained in:
Michal Nowak 2025-02-04 12:38:25 +00:00
commit a1ca49683a
11 changed files with 221 additions and 248 deletions

View File

@ -9,7 +9,6 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from functools import partial
import filecmp
import os
from pathlib import Path
@ -18,7 +17,7 @@ import shutil
import subprocess
import tempfile
import time
from typing import Any, List, Optional
from typing import Any
import pytest
@ -483,46 +482,6 @@ def templates(system_test_dir: Path):
return isctest.template.TemplateEngine(system_test_dir)
def _run_script(
system_test_dir: Path,
interpreter: str,
script: str,
args: Optional[List[str]] = None,
):
"""Helper function for the shell / perl script invocations (through fixtures below)."""
if args is None:
args = []
path = Path(script)
if not path.is_absolute():
# make sure relative paths are always relative to system_dir
path = system_test_dir.parent / path
script = str(path)
cwd = os.getcwd()
if not path.exists():
raise FileNotFoundError(f"script {script} not found in {cwd}")
isctest.log.debug("running script: %s %s %s", interpreter, script, " ".join(args))
isctest.log.debug(" workdir: %s", cwd)
returncode = 1
cmd = [interpreter, script] + args
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
errors="backslashreplace",
) as proc:
if proc.stdout:
for line in proc.stdout:
isctest.log.info(" %s", line.rstrip("\n"))
proc.communicate()
returncode = proc.returncode
if returncode:
raise subprocess.CalledProcessError(returncode, cmd)
isctest.log.debug(" exited with %d", returncode)
def _get_node_path(node) -> Path:
if isinstance(node.parent, pytest.Session):
if _pytest_major_ver >= 8:
@ -533,23 +492,11 @@ def _get_node_path(node) -> Path:
@pytest.fixture(scope="module")
def shell(system_test_dir):
"""Function to call a shell script with arguments."""
return partial(_run_script, system_test_dir, os.environ["SHELL"])
@pytest.fixture(scope="module")
def perl(system_test_dir):
"""Function to call a perl script with arguments."""
return partial(_run_script, system_test_dir, os.environ["PERL"])
@pytest.fixture(scope="module")
def run_tests_sh(system_test_dir, shell):
def run_tests_sh(system_test_dir):
"""Utility function to execute tests.sh as a python test."""
def run_tests():
shell(f"{system_test_dir}/tests.sh")
isctest.run.shell(f"{system_test_dir}/tests.sh")
return run_tests
@ -559,8 +506,6 @@ def system_test(
request,
system_test_dir,
templates,
shell,
perl,
):
"""
Driver of the test setup/teardown process. Used automatically for every test module.
@ -586,14 +531,16 @@ def system_test(
def check_net_interfaces():
try:
perl("testsock.pl", ["-p", os.environ["PORT"]])
isctest.run.perl(
f"{os.environ['srcdir']}/testsock.pl", ["-p", os.environ["PORT"]]
)
except subprocess.CalledProcessError as exc:
isctest.log.error("testsock.pl: exited with code %d", exc.returncode)
pytest.skip("Network interface aliases not set up.")
def check_prerequisites():
try:
shell(f"{system_test_dir}/prereq.sh")
isctest.run.shell(f"{system_test_dir}/prereq.sh")
except FileNotFoundError:
pass # prereq.sh is optional
except subprocess.CalledProcessError:
@ -602,7 +549,7 @@ def system_test(
def setup_test():
templates.render_auto()
try:
shell(f"{system_test_dir}/setup.sh")
isctest.run.shell(f"{system_test_dir}/setup.sh")
except FileNotFoundError:
pass # setup.sh is optional
except subprocess.CalledProcessError as exc:
@ -611,14 +558,17 @@ def system_test(
def start_servers():
try:
perl("start.pl", ["--port", os.environ["PORT"], system_test_dir.name])
isctest.run.perl(
f"{os.environ['srcdir']}/start.pl",
["--port", os.environ["PORT"], system_test_dir.name],
)
except subprocess.CalledProcessError as exc:
isctest.log.error("Failed to start servers")
pytest.fail(f"start.pl exited with {exc.returncode}")
def stop_servers():
try:
perl("stop.pl", [system_test_dir.name])
isctest.run.perl(f"{os.environ['srcdir']}/stop.pl", [system_test_dir.name])
except subprocess.CalledProcessError as exc:
isctest.log.error("Failed to stop servers")
get_core_dumps()
@ -626,7 +576,9 @@ def system_test(
def get_core_dumps():
try:
shell("get_core_dumps.sh", [system_test_dir.name])
isctest.run.shell(
f"{os.environ['srcdir']}/get_core_dumps.sh", [system_test_dir.name]
)
except subprocess.CalledProcessError as exc:
isctest.log.error("Found core dumps or sanitizer reports")
pytest.fail(f"get_core_dumps.sh exited with {exc.returncode}")

View File

@ -102,6 +102,10 @@ def is_executable(cmd: str, errmsg: str) -> None:
assert executable is not None, errmsg
def notauth(message: dns.message.Message) -> None:
rcode(message, dns.rcode.NOTAUTH)
def nxdomain(message: dns.message.Message) -> None:
rcode(message, dns.rcode.NXDOMAIN)

View File

@ -11,13 +11,15 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import NamedTuple, Optional
from typing import List, NamedTuple, Optional
import logging
import os
from pathlib import Path
import re
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .run import perl
from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere
@ -48,13 +50,17 @@ class NamedInstance:
def __init__(
self,
identifier: str,
num: Optional[int] = None,
ports: Optional[NamedPorts] = None,
rndc_logger: Optional[logging.Logger] = None,
rndc_executor: Optional[RNDCExecutor] = None,
) -> None:
"""
`identifier` must be an `ns<X>` string, where `<X>` is an integer
identifier of the `named` instance this object should represent.
`identifier` is the name of the instance's directory
`num` is optional if the identifier is in a form of `ns<X>`, in which
case `<X>` is assumed to be numeric identifier; otherwise it must be
provided to assign a numeric identification to the server
`ports` is the `NamedPorts` instance listing the UDP/TCP ports on which
this `named` instance is listening for various types of traffic (both
@ -67,7 +73,13 @@ class NamedInstance:
`rndc_executor` is an object implementing the `RNDCExecutor` interface
that is used for executing RNDC commands on this `named` instance.
"""
self.ip = self._identifier_to_ip(identifier)
self.directory = Path(identifier).absolute()
if not self.directory.is_dir():
raise ValueError(f"{self.directory} isn't a directory")
self.system_test_name = self.directory.parent.name
self.identifier = identifier
self.num = self._identifier_to_num(identifier, num)
if ports is None:
ports = NamedPorts.from_env()
self.ports = ports
@ -75,12 +87,21 @@ class NamedInstance:
self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger
@property
def ip(self) -> str:
"""IPv4 address of the instance."""
return f"10.53.0.{self.num}"
@staticmethod
def _identifier_to_ip(identifier: str) -> str:
def _identifier_to_num(identifier: str, num: Optional[int] = None) -> int:
regex_match = re.match(r"^ns(?P<index>[0-9]{1,2})$", identifier)
if not regex_match:
raise ValueError("Invalid named instance identifier" + identifier)
return "10.53.0." + regex_match.group("index")
if num is None:
raise ValueError(f'Can\'t parse numeric identifier from "{identifier}"')
return num
parsed_num = int(regex_match.group("index"))
assert num is None or num == parsed_num, "mismatched num and identifier"
return parsed_num
def rndc(self, command: str, ignore_errors: bool = False, log: bool = True) -> str:
"""
@ -175,3 +196,19 @@ class NamedInstance:
info(fmt, args)
else:
self._rndc_logger.info(fmt, args)
def stop(self, args: Optional[List[str]] = None) -> None:
"""Stop the instance."""
args = args or []
perl(
f"{os.environ['srcdir']}/stop.pl",
[self.system_test_name, self.identifier] + args,
)
def start(self, args: Optional[List[str]] = None) -> None:
"""Start the instance."""
args = args or []
perl(
f"{os.environ['srcdir']}/start.pl",
[self.system_test_name, self.identifier] + args,
)

View File

@ -10,9 +10,10 @@
# information regarding copyright ownership.
import os
from pathlib import Path
import subprocess
import time
from typing import Optional
from typing import List, Optional
import isctest.log
from isctest.compat import dns_rcode
@ -65,6 +66,51 @@ def cmd(
return exc
def _run_script(
interpreter: str,
script: str,
args: Optional[List[str]] = None,
):
if args is None:
args = []
path = Path(script)
script = str(path)
cwd = os.getcwd()
if not path.exists():
raise FileNotFoundError(f"script {script} not found in {cwd}")
isctest.log.debug("running script: %s %s %s", interpreter, script, " ".join(args))
isctest.log.debug(" workdir: %s", cwd)
returncode = 1
command = [interpreter, script] + args
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
errors="backslashreplace",
) as proc:
if proc.stdout:
for line in proc.stdout:
isctest.log.info(" %s", line.rstrip("\n"))
proc.communicate()
returncode = proc.returncode
if returncode:
raise subprocess.CalledProcessError(returncode, command)
isctest.log.debug(" exited with %d", returncode)
def shell(script: str, args: Optional[List[str]] = None) -> None:
"""Run a given script with system's shell interpreter."""
_run_script(os.environ["SHELL"], script, args)
def perl(script: str, args: Optional[List[str]] = None) -> None:
"""Run a given script with system's perl interpreter."""
_run_script(os.environ["PERL"], script, args)
def retry_with_timeout(func, timeout, delay=1, msg=None):
start_time = time.time()
while time.time() < start_time + timeout:
@ -91,18 +137,6 @@ def get_named_cmdline(cfg_dir, cfg_file="named.conf"):
return named_cmdline
def get_custom_named_instance(assumed_ns):
# This test launches and monitors a named instance itself rather than using
# bin/tests/system/start.pl, so manually defining a NamedInstance here is
# necessary for sending RNDC commands to that instance. If this "custom"
# instance listens on 10.53.0.3, use "ns3" as the identifier passed to
# the NamedInstance constructor.
named_ports = isctest.instance.NamedPorts.from_env()
instance = isctest.instance.NamedInstance(assumed_ns, named_ports)
return instance
def assert_custom_named_is_alive(named_proc, resolver_ip):
assert named_proc.poll() is None, "named isn't running"
msg = dns.message.make_query("version.bind", "TXT", "CH")

View File

@ -170,7 +170,7 @@ def test_named_shutdown(kill_method):
cfg_dir = "resolver"
named_cmdline = isctest.run.get_named_cmdline(cfg_dir)
instance = isctest.run.get_custom_named_instance("ns3")
instance = isctest.instance.NamedInstance("resolver", num=3)
with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log:
with subprocess.Popen(

View File

@ -1,21 +0,0 @@
; <<>> DiG 8.2 <<>> -p @10.53.0.3 +norec data.child.example txt
; (1 server found)
;; res options: init defnam dnsrch
;; got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 216
;; flags: qr ra ad; QUERY: 1, ANSWER: 0, AUTHORITY: 1, ADDITIONAL: 1
;; QUERY SECTION:
;; data.child.example, type = TXT, class = IN
;; AUTHORITY SECTION:
child.example. 5M IN NS ns2.child.example.
;; ADDITIONAL SECTION:
ns2.child.example. 5M IN A 10.53.0.2
;; Total query time: 3 msec
;; FROM: draco to SERVER: 10.53.0.3
;; WHEN: Wed Jun 21 10:58:37 2000
;; MSG SIZE sent: 36 rcvd: 70

View File

@ -1,18 +0,0 @@
; <<>> DiG 8.2 <<>> -p @10.53.0.3 data.child.example txt
; (1 server found)
;; res options: init recurs defnam dnsrch
;; got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 6
;; flags: qr rd ra ad; QUERY: 1, ANSWER: 1, AUTHORITY: 1, ADDITIONAL: 1
;; QUERY SECTION:
;; data.child.example, type = TXT, class = IN
;; ANSWER SECTION:
data.child.example. 5M IN TXT "some" "test" "data"
;; Total query time: 8 msec
;; FROM: draco to SERVER: 10.53.0.3
;; WHEN: Wed Jun 21 10:58:54 2000
;; MSG SIZE sent: 36 rcvd: 97

View File

@ -25,6 +25,15 @@ options {
minimal-responses no;
};
key rndc_key {
secret "1234abcd8765";
algorithm @DEFAULT_HMAC@;
};
controls {
inet 10.53.0.3 port @CONTROLPORT@ allow { any; } keys { rndc_key; };
};
zone "." {
type hint;
file "../../_common/root.hint";

View File

@ -1,100 +0,0 @@
#!/bin/sh
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
set -e
. ../conf.sh
DIGOPTS="+tcp -p ${PORT}"
status=0
echo_i "check that the stub zone has been saved to disk"
for i in 1 2 3 4 5 6 7 8 9 20; do
[ -f ns3/child.example.st ] && break
sleep 1
done
[ -f ns3/child.example.st ] || {
status=1
echo_i "failed"
}
for pass in 1 2; do
echo_i "trying an axfr that should be denied (NOTAUTH) (pass=$pass)"
ret=0
$DIG $DIGOPTS child.example. @10.53.0.3 axfr >dig.out.ns3 || ret=1
grep "; Transfer failed." dig.out.ns3 >/dev/null || ret=1
[ $ret = 0 ] || {
status=1
echo_i "failed"
}
echo_i "look for stub zone data without recursion (should not be found) (pass=$pass)"
for i in 1 2 3 4 5 6 7 8 9; do
ret=0
$DIG $DIGOPTS +norec data.child.example. \
@10.53.0.3 txt >dig.out.ns3 || ret=1
grep "status: NOERROR" dig.out.ns3 >/dev/null || ret=1
[ $ret = 0 ] && break
sleep 1
done
digcomp knowngood.dig.out.norec dig.out.ns3 || ret=1
[ $ret = 0 ] || {
status=1
echo_i "failed"
}
echo_i "look for stub zone data with recursion (should be found) (pass=$pass)"
ret=0
$DIG $DIGOPTS +noauth +noadd data.child.example. @10.53.0.3 txt >dig.out.ns3 || ret=1
digcomp knowngood.dig.out.rec dig.out.ns3 || ret=1
[ $ret = 0 ] || {
status=1
echo_i "failed"
}
[ $pass = 1 ] && {
echo_i "stopping stub server"
stop_server ns3
echo_i "re-starting stub server"
start_server --noclean --restart --port ${PORT} ns3
}
done
echo_i "check that glue record is correctly transferred from primary when minimal-responses is on"
ret=0
# First ensure that zone data was transfered.
for i in 1 2 3 4 5 6 7; do
[ -f ns5/example.db ] && break
sleep 1
done
if [ -f ns5/example.db ]; then
# If NS glue wasn't transferred, this query would fail.
$DIG $DIGOPTS +nodnssec @10.53.0.5 target.example. txt >dig.out.ns5 || ret=1
grep 'target\.example.*TXT.*"test"' dig.out.ns5 >/dev/null || ret=1
# Ensure both ipv4 and ipv6 glue records were transferred.
grep -E 'ns4.example.[[:space:]]+300 IN A[[:space:]]+10.53.0.4' ns5/example.db >/dev/null || ret=1
grep -E 'ns4.example.[[:space:]]+300 IN AAAA[[:space:]]+fd92:7065:b8e:ffff::4' ns5/example.db >/dev/null || ret=1
[ $ret = 0 ] || {
status=1
echo_i "failed"
}
else
status=1
echo_i "failed: stub zone transfer failed ns4(primary) <---> ns5/example.db"
fi
echo_i "exit status: $status"
[ $status -eq 0 ] || exit 1

View File

@ -1,24 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import pytest
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
"ns3/child.example.st",
"ns5/example.db",
]
)
def test_stub(run_tests_sh):
run_tests_sh()

View File

@ -0,0 +1,100 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import dns.message
import pytest
import isctest
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
"ns3/child.example.st",
"ns5/example.db",
]
)
def test_stub_zones_availability(servers):
# check that the stub zone has been saved to disk
assert os.path.exists("ns3/child.example.st")
# try an AXFR that should be denied (NOTAUTH)
def axfr_denied():
msg = dns.message.make_query("child.example.", "AXFR")
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.notauth(res)
# look for stub zone data without recursion (should not be found)
def stub_zone_lookout_without_recursion():
# drop all flags (dns.flags.RD is set by default)
msg = dns.message.make_query("data.child.example.", "TXT")
msg.flags = 0
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res)
assert not res.answer
assert res.authority[0] == dns.rrset.from_text(
"child.example.", "300", "IN", "NS", "ns2.child.example."
)
assert res.additional[0] == dns.rrset.from_text(
"ns2.child.example.", "300", "IN", "A", "10.53.0.2"
)
# look for stub zone data with recursion (should be found)
def stub_zone_lookout_with_recursion():
# dns.flags.RD is set by default
msg = dns.message.make_query("data.child.example.", "TXT")
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res)
assert res.answer[0] == dns.rrset.from_text(
"data.child.example.", "300", "IN", "TXT", '"some" "test" "data"'
)
axfr_denied()
stub_zone_lookout_without_recursion()
stub_zone_lookout_with_recursion()
servers["ns3"].stop()
servers["ns3"].start(["--noclean", "--restart", "--port", os.environ["PORT"]])
axfr_denied()
stub_zone_lookout_without_recursion()
stub_zone_lookout_with_recursion()
# check that glue record is correctly transferred from primary when the "minimal-responses" option is on
def test_stub_glue_record_with_minimal_response():
# ensure zone data were transfered
assert os.path.exists("ns5/example.db")
# this query would fail if NS glue wasn't transferred
msg_txt = dns.message.make_query("target.example.", "TXT", want_dnssec=False)
res_txt = isctest.query.tcp(msg_txt, "10.53.0.5")
isctest.check.noerror(res_txt)
assert res_txt.answer[0] == dns.rrset.from_text(
"target.example.", "300", "IN", "TXT", '"test"'
)
# ensure both IPv4 and IPv6 glue records were transferred
msg_a = dns.message.make_query("ns4.example.", "A")
res_a = isctest.query.tcp(msg_a, "10.53.0.5")
assert res_a.answer[0] == dns.rrset.from_text(
"ns4.example.", "300", "IN", "A", "10.53.0.4"
)
msg_aaaa = dns.message.make_query("ns4.example.", "AAAA")
res_aaaa = isctest.query.tcp(msg_aaaa, "10.53.0.5")
assert res_aaaa.answer[0] == dns.rrset.from_text(
"ns4.example.", "300", "IN", "AAAA", "fd92:7065:b8e:ffff::4"
)