mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-30 22:15:20 +00:00
[9.20] chg: test: Improve pytest log output
- increase clarity of multiline messages - support `isc.query.*()` query&response logging - replace use of `print()` statement with proper logging - omit empty lines from test result output Backport of MR !10590 Merge branch 'backport-nicki/improve-pytest-logging-9.20' into 'bind-9.20' See merge request isc-projects/bind9!10660
This commit is contained in:
@@ -73,9 +73,9 @@ def has_signed_apex_nsec(zone, response):
|
|||||||
has_rrsig = True
|
has_rrsig = True
|
||||||
|
|
||||||
if not has_nsec:
|
if not has_nsec:
|
||||||
print("error: missing apex NSEC record in response")
|
isctest.log.error("missing apex NSEC record in response")
|
||||||
if not has_rrsig:
|
if not has_rrsig:
|
||||||
print("error: missing NSEC signature in response")
|
isctest.log.error("missing NSEC signature in response")
|
||||||
|
|
||||||
return has_nsec and has_rrsig
|
return has_nsec and has_rrsig
|
||||||
|
|
||||||
@@ -103,8 +103,7 @@ def verify_zone(zone, transfer):
|
|||||||
verifier = isctest.run.cmd(verify_cmd)
|
verifier = isctest.run.cmd(verify_cmd)
|
||||||
|
|
||||||
if verifier.returncode != 0:
|
if verifier.returncode != 0:
|
||||||
print(f"error: dnssec-verify {zone} failed")
|
isctest.log.error(f"dnssec-verify {zone} failed")
|
||||||
sys.stderr.buffer.write(verifier.stderr)
|
|
||||||
|
|
||||||
return verifier.returncode == 0
|
return verifier.returncode == 0
|
||||||
|
|
||||||
@@ -132,7 +131,7 @@ def read_statefile(server, zone):
|
|||||||
), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
|
), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
|
||||||
|
|
||||||
filename = f"ns9/K{zone}+013+{keyid:05d}.state"
|
filename = f"ns9/K{zone}+013+{keyid:05d}.state"
|
||||||
print(f"read state file {filename}")
|
isctest.log.debug(f"read state file {filename}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(filename, "r", encoding="utf-8") as file:
|
with open(filename, "r", encoding="utf-8") as file:
|
||||||
@@ -212,8 +211,7 @@ def rekey(zone):
|
|||||||
controller = isctest.run.cmd(rndc_cmd)
|
controller = isctest.run.cmd(rndc_cmd)
|
||||||
|
|
||||||
if controller.returncode != 0:
|
if controller.returncode != 0:
|
||||||
print(f"error: rndc loadkeys {zone} failed")
|
isctest.log.error(f"rndc loadkeys {zone} failed")
|
||||||
sys.stderr.buffer.write(controller.stderr)
|
|
||||||
|
|
||||||
assert controller.returncode == 0
|
assert controller.returncode == 0
|
||||||
|
|
||||||
|
@@ -360,12 +360,13 @@ def system_test_dir(request, system_test_name, expected_artifacts):
|
|||||||
if node.nodeid in all_test_results
|
if node.nodeid in all_test_results
|
||||||
}
|
}
|
||||||
assert len(test_results)
|
assert len(test_results)
|
||||||
messages = []
|
|
||||||
for node, result in test_results.items():
|
for node, result in test_results.items():
|
||||||
isctest.log.debug("%s %s", result.outcome.upper(), node)
|
message = f"{result.outcome.upper()} {node}"
|
||||||
messages.extend(result.messages.values())
|
nonempty_extra = [msg for msg in result.messages.values() if msg.strip()]
|
||||||
for message in messages:
|
if nonempty_extra:
|
||||||
isctest.log.debug("\n" + message)
|
message += "\n"
|
||||||
|
message += "\n\n".join(nonempty_extra)
|
||||||
|
isctest.log.debug(message)
|
||||||
failed = any(res.outcome == "failed" for res in test_results.values())
|
failed = any(res.outcome == "failed" for res in test_results.values())
|
||||||
skipped = any(res.outcome == "skipped" for res in test_results.values())
|
skipped = any(res.outcome == "skipped" for res in test_results.values())
|
||||||
if failed:
|
if failed:
|
||||||
|
@@ -44,7 +44,7 @@ def run_rndc(server, rndc_command):
|
|||||||
cmdline = [rndc, "-c", "../_common/rndc.conf", "-p", port, "-s", server]
|
cmdline = [rndc, "-c", "../_common/rndc.conf", "-p", port, "-s", server]
|
||||||
cmdline.extend(rndc_command)
|
cmdline.extend(rndc_command)
|
||||||
|
|
||||||
isctest.run.cmd(cmdline, log_stdout=True)
|
isctest.run.cmd(cmdline)
|
||||||
|
|
||||||
|
|
||||||
def test_dnstap_dispatch_socket_addresses():
|
def test_dnstap_dispatch_socket_addresses():
|
||||||
@@ -64,7 +64,6 @@ def test_dnstap_dispatch_socket_addresses():
|
|||||||
# Read the contents of the dnstap file using dnstap-read.
|
# Read the contents of the dnstap file using dnstap-read.
|
||||||
run = isctest.run.cmd(
|
run = isctest.run.cmd(
|
||||||
[isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"],
|
[isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"],
|
||||||
log_stdout=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check whether all frames contain the expected addresses.
|
# Check whether all frames contain the expected addresses.
|
||||||
|
@@ -451,7 +451,7 @@ class Key:
|
|||||||
str(self.keyfile),
|
str(self.keyfile),
|
||||||
]
|
]
|
||||||
|
|
||||||
out = isctest.run.cmd(dsfromkey_command, log_stdout=True)
|
out = isctest.run.cmd(dsfromkey_command)
|
||||||
dsfromkey = out.stdout.decode("utf-8").split()
|
dsfromkey = out.stdout.decode("utf-8").split()
|
||||||
|
|
||||||
rdata_fromfile = " ".join(dsfromkey[:7])
|
rdata_fromfile = " ".join(dsfromkey[:7])
|
||||||
|
@@ -11,10 +11,12 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import textwrap
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
|
|
||||||
|
|
||||||
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
|
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
|
||||||
|
LOG_INDENT = 4
|
||||||
|
|
||||||
LOGGERS = {
|
LOGGERS = {
|
||||||
"conftest": None,
|
"conftest": None,
|
||||||
@@ -77,6 +79,13 @@ def deinit_test_logger():
|
|||||||
LOGGERS["test"] = None
|
LOGGERS["test"] = None
|
||||||
|
|
||||||
|
|
||||||
|
def indent_message(msg):
|
||||||
|
lines = msg.splitlines()
|
||||||
|
first = lines[0] + "\n"
|
||||||
|
to_indent = "\n".join(lines[1:])
|
||||||
|
return first + textwrap.indent(to_indent, " " * LOG_INDENT)
|
||||||
|
|
||||||
|
|
||||||
def log(lvl: int, msg: str, *args, **kwargs):
|
def log(lvl: int, msg: str, *args, **kwargs):
|
||||||
"""Log message with the most-specific logger currently available."""
|
"""Log message with the most-specific logger currently available."""
|
||||||
logger = LOGGERS["test"]
|
logger = LOGGERS["test"]
|
||||||
@@ -85,6 +94,8 @@ def log(lvl: int, msg: str, *args, **kwargs):
|
|||||||
if logger is None:
|
if logger is None:
|
||||||
logger = LOGGERS["conftest"]
|
logger = LOGGERS["conftest"]
|
||||||
assert logger is not None
|
assert logger is not None
|
||||||
|
if "\n" in msg:
|
||||||
|
msg = indent_message(msg)
|
||||||
logger.log(lvl, msg, *args, **kwargs)
|
logger.log(lvl, msg, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@@ -32,6 +32,8 @@ def generic_query(
|
|||||||
attempts: int = 10,
|
attempts: int = 10,
|
||||||
expected_rcode: dns_rcode = None,
|
expected_rcode: dns_rcode = None,
|
||||||
verify: bool = False,
|
verify: bool = False,
|
||||||
|
log_query: bool = True,
|
||||||
|
log_response: bool = True,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
if port is None:
|
if port is None:
|
||||||
if query_func.__name__ == "tls":
|
if query_func.__name__ == "tls":
|
||||||
@@ -51,15 +53,25 @@ def generic_query(
|
|||||||
|
|
||||||
res = None
|
res = None
|
||||||
for attempt in range(attempts):
|
for attempt in range(attempts):
|
||||||
isctest.log.debug(
|
log_msg = (
|
||||||
f"{query_func.__name__}(): ip={ip}, port={port}, source={source}, "
|
f"isc.query.{query_func.__name__}(): ip={ip}, port={port}, source={source}, "
|
||||||
f"timeout={timeout}, attempts left={attempts-attempt}"
|
f"timeout={timeout}, attempts left={attempts-attempt}"
|
||||||
)
|
)
|
||||||
|
if log_query:
|
||||||
|
log_msg += f"\n{message.to_text()}"
|
||||||
|
log_query = False # only log query on first attempt
|
||||||
|
isctest.log.debug(log_msg)
|
||||||
try:
|
try:
|
||||||
res = query_func(**query_args)
|
res = query_func(**query_args)
|
||||||
except (dns.exception.Timeout, ConnectionRefusedError) as e:
|
except (dns.exception.Timeout, ConnectionRefusedError) as e:
|
||||||
isctest.log.debug(f"{query_func.__name__}(): the '{e}' exception raised")
|
isctest.log.debug(
|
||||||
|
f"isc.query.{query_func.__name__}(): the '{e}' exception raised"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
|
if log_response:
|
||||||
|
isctest.log.debug(
|
||||||
|
f"isc.query.{query_func.__name__}(): response\n{res.to_text()}"
|
||||||
|
)
|
||||||
if res.rcode() == expected_rcode or expected_rcode is None:
|
if res.rcode() == expected_rcode or expected_rcode is None:
|
||||||
return res
|
return res
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
@@ -67,7 +79,7 @@ def generic_query(
|
|||||||
if expected_rcode is not None:
|
if expected_rcode is not None:
|
||||||
last_rcode = dns_rcode.to_text(res.rcode()) if res else None
|
last_rcode = dns_rcode.to_text(res.rcode()) if res else None
|
||||||
isctest.log.debug(
|
isctest.log.debug(
|
||||||
f"{query_func.__name__}(): expected rcode={dns_rcode.to_text(expected_rcode)}, last rcode={last_rcode}"
|
f"isc.query.{query_func.__name__}(): expected rcode={dns_rcode.to_text(expected_rcode)}, last rcode={last_rcode}"
|
||||||
)
|
)
|
||||||
raise dns.exception.Timeout
|
raise dns.exception.Timeout
|
||||||
|
|
||||||
|
@@ -24,24 +24,24 @@ def cmd(
|
|||||||
timeout=60,
|
timeout=60,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
log_stdout=False,
|
log_stdout=True,
|
||||||
log_stderr=True,
|
log_stderr=True,
|
||||||
input_text: Optional[bytes] = None,
|
input_text: Optional[bytes] = None,
|
||||||
raise_on_exception=True,
|
raise_on_exception=True,
|
||||||
env: Optional[dict] = None,
|
env: Optional[dict] = None,
|
||||||
):
|
):
|
||||||
"""Execute a command with given args as subprocess."""
|
"""Execute a command with given args as subprocess."""
|
||||||
isctest.log.debug(f"command: {' '.join(args)}")
|
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
|
||||||
|
|
||||||
def print_debug_logs(procdata):
|
def print_debug_logs(procdata):
|
||||||
if procdata:
|
if procdata:
|
||||||
if log_stdout and procdata.stdout:
|
if log_stdout and procdata.stdout:
|
||||||
isctest.log.debug(
|
isctest.log.debug(
|
||||||
f"~~~ cmd stdout ~~~\n{procdata.stdout.decode('utf-8')}\n~~~~~~~~~~~~~~~~~~"
|
f"isctest.run.cmd(): (stdout)\n{procdata.stdout.decode('utf-8')}"
|
||||||
)
|
)
|
||||||
if log_stderr and procdata.stderr:
|
if log_stderr and procdata.stderr:
|
||||||
isctest.log.debug(
|
isctest.log.debug(
|
||||||
f"~~~ cmd stderr ~~~\n{procdata.stderr.decode('utf-8')}\n~~~~~~~~~~~~~~~~~~"
|
f"isctest.run.cmd(): (stderr)\n{procdata.stderr.decode('utf-8')}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if env is None:
|
if env is None:
|
||||||
@@ -62,7 +62,7 @@ def cmd(
|
|||||||
return proc
|
return proc
|
||||||
except subprocess.CalledProcessError as exc:
|
except subprocess.CalledProcessError as exc:
|
||||||
print_debug_logs(exc)
|
print_debug_logs(exc)
|
||||||
isctest.log.debug(f" return code: {exc.returncode}")
|
isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
|
||||||
if raise_on_exception:
|
if raise_on_exception:
|
||||||
raise exc
|
raise exc
|
||||||
return exc
|
return exc
|
||||||
@@ -111,7 +111,6 @@ class Dig:
|
|||||||
"""Run the dig command with the given parameters and return the decoded output."""
|
"""Run the dig command with the given parameters and return the decoded output."""
|
||||||
return cmd(
|
return cmd(
|
||||||
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
|
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
|
||||||
log_stdout=True,
|
|
||||||
).stdout.decode("utf-8")
|
).stdout.decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
@@ -1258,7 +1258,7 @@ def test_kasp_dnssec_keygen():
|
|||||||
zone,
|
zone,
|
||||||
]
|
]
|
||||||
|
|
||||||
return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
|
return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
|
||||||
|
|
||||||
# check that 'dnssec-keygen -k' (configured policy) creates valid files.
|
# check that 'dnssec-keygen -k' (configured policy) creates valid files.
|
||||||
keyprops = [
|
keyprops = [
|
||||||
@@ -1294,7 +1294,7 @@ def test_kasp_dnssec_keygen():
|
|||||||
str(publish),
|
str(publish),
|
||||||
key.path,
|
key.path,
|
||||||
]
|
]
|
||||||
out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
|
out = isctest.run.cmd(settime).stdout.decode("utf-8")
|
||||||
|
|
||||||
isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
|
isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
|
||||||
assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
|
assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
|
||||||
@@ -1343,7 +1343,7 @@ def test_kasp_dnssec_keygen():
|
|||||||
str(now),
|
str(now),
|
||||||
key.path,
|
key.path,
|
||||||
]
|
]
|
||||||
out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
|
out = isctest.run.cmd(settime).stdout.decode("utf-8")
|
||||||
isctest.kasp.check_keys("kasp", keys, expected)
|
isctest.kasp.check_keys("kasp", keys, expected)
|
||||||
isctest.kasp.check_keytimes(keys, expected)
|
isctest.kasp.check_keytimes(keys, expected)
|
||||||
|
|
||||||
@@ -1378,7 +1378,7 @@ def test_kasp_dnssec_keygen():
|
|||||||
str(now),
|
str(now),
|
||||||
key.path,
|
key.path,
|
||||||
]
|
]
|
||||||
out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
|
out = isctest.run.cmd(settime).stdout.decode("utf-8")
|
||||||
isctest.kasp.check_keys("kasp", keys, expected)
|
isctest.kasp.check_keys("kasp", keys, expected)
|
||||||
isctest.kasp.check_keytimes(keys, expected)
|
isctest.kasp.check_keytimes(keys, expected)
|
||||||
|
|
||||||
@@ -1423,7 +1423,7 @@ def test_kasp_dnssec_keygen():
|
|||||||
str(soon),
|
str(soon),
|
||||||
key.path,
|
key.path,
|
||||||
]
|
]
|
||||||
out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
|
out = isctest.run.cmd(settime).stdout.decode("utf-8")
|
||||||
isctest.kasp.check_keys("kasp", keys, expected)
|
isctest.kasp.check_keys("kasp", keys, expected)
|
||||||
isctest.kasp.check_keytimes(keys, expected)
|
isctest.kasp.check_keytimes(keys, expected)
|
||||||
|
|
||||||
|
@@ -70,13 +70,12 @@ def token_init_and_cleanup():
|
|||||||
isctest.run.cmd(
|
isctest.run.cmd(
|
||||||
token_cleanup_command,
|
token_cleanup_command,
|
||||||
env=EMPTY_OPENSSL_CONF_ENV,
|
env=EMPTY_OPENSSL_CONF_ENV,
|
||||||
log_stdout=True,
|
|
||||||
raise_on_exception=False,
|
raise_on_exception=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
output = isctest.run.cmd(
|
output = isctest.run.cmd(
|
||||||
token_init_command, env=EMPTY_OPENSSL_CONF_ENV, log_stdout=True
|
token_init_command, env=EMPTY_OPENSSL_CONF_ENV
|
||||||
).stdout.decode("utf-8")
|
).stdout.decode("utf-8")
|
||||||
assert "The token has been initialized and is reassigned to slot" in output
|
assert "The token has been initialized and is reassigned to slot" in output
|
||||||
yield
|
yield
|
||||||
@@ -84,7 +83,6 @@ def token_init_and_cleanup():
|
|||||||
output = isctest.run.cmd(
|
output = isctest.run.cmd(
|
||||||
token_cleanup_command,
|
token_cleanup_command,
|
||||||
env=EMPTY_OPENSSL_CONF_ENV,
|
env=EMPTY_OPENSSL_CONF_ENV,
|
||||||
log_stdout=True,
|
|
||||||
raise_on_exception=False,
|
raise_on_exception=False,
|
||||||
).stdout.decode("utf-8")
|
).stdout.decode("utf-8")
|
||||||
assert re.search("Found token (.*) with matching token label", output)
|
assert re.search("Found token (.*) with matching token label", output)
|
||||||
@@ -129,7 +127,7 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
|
|||||||
]
|
]
|
||||||
|
|
||||||
output = isctest.run.cmd(
|
output = isctest.run.cmd(
|
||||||
pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV, log_stdout=True
|
pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
|
||||||
).stdout.decode("utf-8")
|
).stdout.decode("utf-8")
|
||||||
|
|
||||||
assert "Key pair generated" in output
|
assert "Key pair generated" in output
|
||||||
@@ -148,7 +146,7 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
|
|||||||
zone,
|
zone,
|
||||||
]
|
]
|
||||||
|
|
||||||
output = isctest.run.cmd(keyfrlab_command, log_stdout=True)
|
output = isctest.run.cmd(keyfrlab_command)
|
||||||
output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
|
output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
|
||||||
|
|
||||||
assert os.path.exists(output_decoded)
|
assert os.path.exists(output_decoded)
|
||||||
@@ -188,6 +186,6 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
|
|||||||
zone,
|
zone,
|
||||||
zone_file,
|
zone_file,
|
||||||
]
|
]
|
||||||
isctest.run.cmd(signer_command, log_stdout=True)
|
isctest.run.cmd(signer_command)
|
||||||
|
|
||||||
assert os.path.exists(f"{zone_file}.signed")
|
assert os.path.exists(f"{zone_file}.signed")
|
||||||
|
@@ -96,9 +96,7 @@ def ksr(zone, policy, action, options="", raise_on_exception=True):
|
|||||||
zone,
|
zone,
|
||||||
]
|
]
|
||||||
|
|
||||||
out = isctest.run.cmd(
|
out = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
|
||||||
ksr_command, log_stdout=True, raise_on_exception=raise_on_exception
|
|
||||||
)
|
|
||||||
return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
|
return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
@@ -33,7 +33,7 @@ pytest.importorskip("dns", minversion="2.0.0")
|
|||||||
)
|
)
|
||||||
def test_limits(name, limit):
|
def test_limits(name, limit):
|
||||||
msg_query = dns.message.make_query(f"{name}.example.", "A")
|
msg_query = dns.message.make_query(f"{name}.example.", "A")
|
||||||
res = isctest.query.tcp(msg_query, "10.53.0.1")
|
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
|
||||||
|
|
||||||
iplist = [
|
iplist = [
|
||||||
f"10.0.{x}.{y}"
|
f"10.0.{x}.{y}"
|
||||||
@@ -47,6 +47,6 @@ def test_limits(name, limit):
|
|||||||
|
|
||||||
def test_limit_exceeded():
|
def test_limit_exceeded():
|
||||||
msg_query = dns.message.make_query("5000.example.", "A")
|
msg_query = dns.message.make_query("5000.example.", "A")
|
||||||
res = isctest.query.tcp(msg_query, "10.53.0.1")
|
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
|
||||||
|
|
||||||
assert res.flags & dns.flags.TC, "TC flag was not set"
|
assert res.flags & dns.flags.TC, "TC flag was not set"
|
||||||
|
@@ -265,7 +265,7 @@ def test_rollover_multisigner(servers):
|
|||||||
zone,
|
zone,
|
||||||
]
|
]
|
||||||
|
|
||||||
return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
|
return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
|
||||||
|
|
||||||
zone = "multisigner-model2.kasp"
|
zone = "multisigner-model2.kasp"
|
||||||
|
|
||||||
|
@@ -43,7 +43,7 @@ def test_max_rsa_exponent_size_good(exponent_size, templates):
|
|||||||
def test_max_rsa_exponent_size_bad(exponent_size, templates):
|
def test_max_rsa_exponent_size_bad(exponent_size, templates):
|
||||||
templates.render("options.conf", {"max_rsa_exponent_size": exponent_size})
|
templates.render("options.conf", {"max_rsa_exponent_size": exponent_size})
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
isctest.run.cmd([CHECKCONF, "options.conf"], log_stdout=True)
|
isctest.run.cmd([CHECKCONF, "options.conf"])
|
||||||
|
|
||||||
|
|
||||||
def test_rsa_big_exponent_keys_cant_load():
|
def test_rsa_big_exponent_keys_cant_load():
|
||||||
|
@@ -54,7 +54,9 @@ def update_zone(test_state, zone):
|
|||||||
update = dns.update.UpdateMessage(zone)
|
update = dns.update.UpdateMessage(zone)
|
||||||
update.add(f"dynamic-{i}.{zone}", 300, "TXT", f"txt-{i}")
|
update.add(f"dynamic-{i}.{zone}", 300, "TXT", f"txt-{i}")
|
||||||
try:
|
try:
|
||||||
response = isctest.query.udp(update, server)
|
response = isctest.query.udp(
|
||||||
|
update, server, log_query=False, log_response=False
|
||||||
|
)
|
||||||
assert response.rcode() == dns.rcode.NOERROR
|
assert response.rcode() == dns.rcode.NOERROR
|
||||||
except dns.exception.Timeout:
|
except dns.exception.Timeout:
|
||||||
isctest.log.info(f"error: query timeout for {zone}")
|
isctest.log.info(f"error: query timeout for {zone}")
|
||||||
|
@@ -17,7 +17,6 @@ A tool for reproducing ISC SPNEGO vulnerabilities
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import datetime
|
|
||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@@ -177,14 +176,8 @@ def send_crafted_tkey_query(opts: argparse.Namespace) -> None:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
query = CraftedTKEYQuery(opts).msg
|
query = CraftedTKEYQuery(opts).msg
|
||||||
print("# > " + str(datetime.datetime.now()))
|
|
||||||
print(query.to_text())
|
|
||||||
print()
|
|
||||||
|
|
||||||
response = isctest.query.tcp(query, opts.server_ip, timeout=2)
|
isctest.query.tcp(query, opts.server_ip, timeout=2)
|
||||||
print("# < " + str(datetime.datetime.now()))
|
|
||||||
print(response.to_text())
|
|
||||||
print()
|
|
||||||
|
|
||||||
|
|
||||||
def test_cve_2020_8625():
|
def test_cve_2020_8625():
|
||||||
|
@@ -47,7 +47,7 @@ VERIFY = os.environ.get("VERIFY")
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_verify_good_zone_files(zone):
|
def test_verify_good_zone_files(zone):
|
||||||
isctest.run.cmd([VERIFY, "-z", "-o", zone, f"zones/{zone}.good"], log_stdout=True)
|
isctest.run.cmd([VERIFY, "-z", "-o", zone, f"zones/{zone}.good"])
|
||||||
|
|
||||||
|
|
||||||
def test_verify_good_zone_nsec_next_name_case_mismatch():
|
def test_verify_good_zone_nsec_next_name_case_mismatch():
|
||||||
@@ -58,7 +58,6 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
|
|||||||
"nsec-next-name-case-mismatch",
|
"nsec-next-name-case-mismatch",
|
||||||
"zones/nsec-next-name-case-mismatch.good",
|
"zones/nsec-next-name-case-mismatch.good",
|
||||||
],
|
],
|
||||||
log_stdout=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -67,7 +66,6 @@ def get_bad_zone_output(zone):
|
|||||||
output = isctest.run.cmd(
|
output = isctest.run.cmd(
|
||||||
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
|
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
|
||||||
raise_on_exception=False,
|
raise_on_exception=False,
|
||||||
log_stdout=True,
|
|
||||||
)
|
)
|
||||||
stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
|
stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
|
||||||
return stream
|
return stream
|
||||||
|
Reference in New Issue
Block a user