2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 14:35:26 +00:00

Merge branch 'stepan/hypothesis' into 'main'

Expand the wildcard system test with wider use of hypothesis

See merge request isc-projects/bind9!8461
This commit is contained in:
Petr Špaček
2024-05-28 13:17:32 +00:00
13 changed files with 397 additions and 54 deletions

View File

@@ -55,6 +55,8 @@ variables:
BIND_STRESS_TEST_OS: linux
BIND_STRESS_TEST_ARCH: amd64
HYPOTHESIS_PROFILE: "ci"
default:
# Allow all running CI jobs to be automatically canceled when a new
# version of a branch is pushed.

View File

@@ -5,6 +5,7 @@ disable=
C0115, # missing-class-docstring
C0116, # missing-function-docstring
C0209, # consider-using-f-string
C0301, # line-too-long, handled better by black
C0415, # import-outside-toplevel
R0801, # duplicate-code
R0903, # too-few-public-methods

View File

@@ -12,10 +12,12 @@
from . import check
from . import instance
from . import query
from . import name
from . import rndc
from . import run
from . import log
from . import vars # pylint: disable=redefined-builtin
from . import hypothesis
# isctest.mark module is intentionally NOT imported, because it relies on
# environment variables which might not be set at the time of import of the

View File

@@ -101,3 +101,21 @@ def zones_equal(
def is_executable(cmd: str, errmsg: str) -> None:
executable = shutil.which(cmd)
assert executable is not None, errmsg
def nxdomain(message: dns.message.Message) -> None:
rcode(message, dns.rcode.NXDOMAIN)
def single_question(message: dns.message.Message) -> None:
assert len(message.question) == 1, str(message)
def empty_answer(message: dns.message.Message) -> None:
assert not message.answer, str(message)
def is_response_to(response: dns.message.Message, query: dns.message.Message) -> None:
single_question(response)
single_question(query)
assert query.is_response(response), str(response)

View File

@@ -0,0 +1,13 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from . import settings
from . import strategies

View File

@@ -0,0 +1,18 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
from hypothesis import settings
# Timing of hypothesis tests is flaky in the CI, so we disable deadlines.
settings.register_profile("ci", deadline=None)
settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "default"))

View File

@@ -0,0 +1,170 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import List
from warnings import warn
from hypothesis.strategies import (
binary,
builds,
composite,
integers,
just,
nothing,
permutations,
)
import dns.name
import dns.message
import dns.rdataclass
import dns.rdatatype
import isctest.name
@composite
def dns_names(
draw,
*,
prefix: dns.name.Name = dns.name.empty,
suffix: dns.name.Name = dns.name.root,
min_labels: int = 1,
max_labels: int = 128,
) -> dns.name.Name:
"""
This is a hypothesis strategy to be used for generating DNS names with given `prefix`, `suffix`
and with total number of labels specified by `min_labels` and `max labels`.
For example, calling
```
dns_names(
prefix=dns.name.from_text("test"),
suffix=dns.name.from_text("isc.org"),
max_labels=6
).example()
```
will result in names like `test.abc.isc.org.` or `test.abc.def.isc.org`.
There is no attempt to make the distribution of the generated names uniform in any way.
The strategy however minimizes towards shorter names with shorter labels.
It can be used with to build compound strategies, like this one which generates random DNS queries.
```
dns_queries = builds(
dns.message.make_query,
qname=dns_names(),
rdtype=dns_rdatatypes,
rdclass=dns_rdataclasses,
)
```
"""
prefix = prefix.relativize(dns.name.root)
suffix = suffix.derelativize(dns.name.root)
try:
outer_name = prefix + suffix
remaining_bytes = 255 - isctest.name.len_wire_uncompressed(outer_name)
assert remaining_bytes >= 0
except dns.name.NameTooLong:
warn(
"Maximal length name of name execeeded by prefix and suffix. Strategy won't generate any names.",
RuntimeWarning,
)
return draw(nothing())
minimum_number_of_labels_to_generate = max(0, min_labels - len(outer_name.labels))
maximum_number_of_labels_to_generate = max_labels - len(outer_name.labels)
if maximum_number_of_labels_to_generate < 0:
warn(
"Maximal number of labels execeeded by prefix and suffix. Strategy won't generate any names.",
RuntimeWarning,
)
return draw(nothing())
maximum_number_of_labels_to_generate = min(
maximum_number_of_labels_to_generate, remaining_bytes // 2
)
if maximum_number_of_labels_to_generate < minimum_number_of_labels_to_generate:
warn(
f"Minimal number set to {minimum_number_of_labels_to_generate}, but in {remaining_bytes} bytes there is only space for maximum of {maximum_number_of_labels_to_generate} labels.",
RuntimeWarning,
)
return draw(nothing())
if remaining_bytes == 0 or maximum_number_of_labels_to_generate == 0:
warn(
f"Strategy will return only one name ({outer_name}) as it exactly matches byte or label length limit.",
RuntimeWarning,
)
return draw(just(outer_name))
chosen_number_of_labels_to_generate = draw(
integers(
minimum_number_of_labels_to_generate, maximum_number_of_labels_to_generate
)
)
chosen_number_of_bytes_to_partion = draw(
integers(2 * chosen_number_of_labels_to_generate, remaining_bytes)
)
chosen_lengths_of_labels = draw(
_partition_bytes_to_labels(
chosen_number_of_bytes_to_partion, chosen_number_of_labels_to_generate
)
)
generated_labels = tuple(
draw(binary(min_size=l - 1, max_size=l - 1)) for l in chosen_lengths_of_labels
)
return dns.name.Name(prefix.labels + generated_labels + suffix.labels)
RDATACLASS_MAX = RDATATYPE_MAX = 65535
try:
dns_rdataclasses = builds(dns.rdataclass.RdataClass, integers(0, RDATACLASS_MAX))
dns_rdatatypes = builds(dns.rdatatype.RdataType, integers(0, RDATATYPE_MAX))
except AttributeError:
# In old dnspython versions, RDataTypes and RDataClasses are int and not enums.
dns_rdataclasses = integers(0, RDATACLASS_MAX) # type: ignore
dns_rdatatypes = integers(0, RDATATYPE_MAX) # type: ignore
dns_rdataclasses_without_meta = dns_rdataclasses.filter(dns.rdataclass.is_metaclass)
# NOTE: This should really be `dns_rdatatypes_without_meta = dns_rdatatypes_without_meta.filter(dns.rdatatype.is_metatype()`,
# but hypothesis then complains about the filter being too strict, so it is done in a “constructive” way.
dns_rdatatypes_without_meta = integers(0, dns.rdatatype.OPT - 1) | integers(dns.rdatatype.OPT + 1, 127) | integers(256, RDATATYPE_MAX) # type: ignore
@composite
def _partition_bytes_to_labels(
draw, remaining_bytes: int, number_of_labels: int
) -> List[int]:
two_bytes_reserved_for_label = 2
# Reserve two bytes for each label
partition = [two_bytes_reserved_for_label] * number_of_labels
remaining_bytes -= two_bytes_reserved_for_label * number_of_labels
assert remaining_bytes >= 0
# Add a random number between 0 and the remainder to each partition
for i in range(number_of_labels):
added = draw(
integers(0, min(remaining_bytes, 64 - two_bytes_reserved_for_label))
)
partition[i] += added
remaining_bytes -= added
# NOTE: Some of the remaining bytes will usually not be assigned to any label, but we don't care.
return draw(permutations(partition))

View File

@@ -0,0 +1,20 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns.name
def prepend_label(label: str, name: dns.name.Name) -> dns.name.Name:
return dns.name.Name((label,) + name.labels)
def len_wire_uncompressed(name: dns.name.Name) -> int:
return len(name) + sum(map(len, name.labels))

View File

@@ -24,10 +24,11 @@ def udp(
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
timeout: int = QUERY_TIMEOUT,
) -> dns.message.Message:
if port is None:
port = int(os.environ["PORT"])
return dns.query.udp(message, ip, QUERY_TIMEOUT, port=port, source=source)
return dns.query.udp(message, ip, timeout, port=port, source=source)
def tcp(
@@ -35,7 +36,8 @@ def tcp(
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
timeout: int = QUERY_TIMEOUT,
) -> dns.message.Message:
if port is None:
port = int(os.environ["PORT"])
return dns.query.tcp(message, ip, QUERY_TIMEOUT, port=port, source=source)
return dns.query.tcp(message, ip, timeout, port=port, source=source)

View File

@@ -34,6 +34,12 @@ zone "example" { type primary; file "example.db"; };
zone "nsec" { type primary; file "nsec.db.signed"; };
zone "private.nsec" { type primary; file "private.nsec.db.signed"; };
zone "nestedwild.test" {
type primary;
file "nestedwild.db";
check-names ignore;
};
/*
* The contents of nsec3 and private.nsec3 are specially chosen to
* have separate NSEC3 records for the "no qname proof" and the

View File

@@ -0,0 +1,16 @@
; Copyright (C) Internet Systems Consortium, Inc. ("ISC")
;
; SPDX-License-Identifier: MPL-2.0
;
; This Source Code Form is subject to the terms of the Mozilla Public
; License, v. 2.0. If a copy of the MPL was not distributed with this
; file, you can obtain one at https://mozilla.org/MPL/2.0/.
;
; See the COPYRIGHT file distributed with this work for additional
; information regarding copyright ownership.
$ORIGIN nestedwild.test.
nestedwild.test. 3600 IN SOA . . 0 0 0 0 0
nestedwild.test. 3600 NS ns.example.test.
*.nestedwild.test. 3600 A 192.0.2.1
*.*.*.nestedwild.test. 3600 A 192.0.2.1

View File

@@ -18,6 +18,7 @@ dssets=
# RFC 4592 example zone.
cp allwild.db.in allwild.db
cp example.db.in example.db
cp nestedwild.db.in nestedwild.db
zone=nsec
infile=nsec.db.in

View File

@@ -11,6 +11,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
"""
Example property-based test for wildcard synthesis.
Verifies that otherwise-empty zone with single wildcard record * A 192.0.2.1
@@ -18,8 +19,6 @@ produces synthesized answers for <random_label>.test. A, and returns NODATA for
<random_label>.test. when rdtype is not A.
Limitations - untested properties:
- expansion works with multiple labels
- asterisk in qname does not cause expansion
- empty non-terminals prevent expansion
- or more generally any existing node prevents expansion
- DNSSEC record inclusion
@@ -28,9 +27,10 @@ Limitations - untested properties:
- flags beyond RCODE
- special behavior of rdtypes like CNAME
"""
import pytest
pytest.importorskip("dns")
pytest.importorskip("dns", minversion="2.0.0")
import dns.message
import dns.name
import dns.query
@@ -47,73 +47,147 @@ try:
pytest.importorskip("hypothesis")
except ValueError:
pytest.importorskip("hypothesis", minversion="4.41.2")
from hypothesis import given
from hypothesis.strategies import binary, integers
from hypothesis import assume, example, given
from isctest.hypothesis.strategies import dns_names, dns_rdatatypes_without_meta
import isctest.check
import isctest.name
import isctest.query
# labels of a zone with * A 192.0.2.1 wildcard
WILDCARD_ZONE = ("allwild", "test", "")
SUFFIX = dns.name.from_text("allwild.test.")
WILDCARD_RDTYPE = dns.rdatatype.A
WILDCARD_RDATA = "192.0.2.1"
IPADDR = "10.53.0.1"
IP_ADDR = "10.53.0.1"
TIMEOUT = 5 # seconds, just a sanity check
# Helpers
def is_nonexpanding_rdtype(rdtype):
"""skip meta types to avoid weird rcodes caused by AXFR etc.; RFC 6895"""
return not (
rdtype == WILDCARD_RDTYPE
or dns.rdatatype.is_metatype(rdtype) # known metatypes: OPT ...
or 128 <= rdtype <= 255
) # unknown meta types
@given(name=dns_names(suffix=SUFFIX), rdtype=dns_rdatatypes_without_meta)
def test_wildcard_rdtype_mismatch(
name: dns.name.Name, rdtype: dns.rdatatype.RdataType, named_port: int
) -> None:
"""Any label non-matching rdtype must result in NODATA."""
assume(rdtype != WILDCARD_RDTYPE)
# NS and SOA are present in the zone and DS gets answered from parent.
assume(
not (
name == SUFFIX
and rdtype in (dns.rdatatype.SOA, dns.rdatatype.NS, dns.rdatatype.DS)
)
)
# Subdomains of *.allwild.test. are not to be synthesized.
# See RFC 4592 section 2.2.1.
assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*")
query_msg = dns.message.make_query(name, rdtype)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
isctest.check.noerror(response_msg)
isctest.check.empty_answer(response_msg)
def tcp_query(where, port, qname, qtype):
querymsg = dns.message.make_query(qname, qtype)
assert len(querymsg.question) == 1
return querymsg, dns.query.tcp(querymsg, where, port=port, timeout=TIMEOUT)
@given(name=dns_names(suffix=SUFFIX, min_labels=len(SUFFIX) + 1))
def test_wildcard_match(name: dns.name.Name, named_port: int) -> None:
"""Any label with maching rdtype must result in wildcard data in answer."""
# Subdomains of *.allwild.test. are not to be synthesized.
# See RFC 4592 section 2.2.1.
assume(name.labels[-len(SUFFIX) - 1] != b"*")
def query(where, port, label, rdtype):
labels = (label,) + WILDCARD_ZONE
qname = dns.name.Name(labels)
return tcp_query(where, port, qname, rdtype)
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
# Tests
@given(
label=binary(min_size=1, max_size=63),
rdtype=integers(min_value=0, max_value=65535).filter(is_nonexpanding_rdtype),
)
def test_wildcard_rdtype_mismatch(label, rdtype, named_port):
"""any label non-matching rdtype must result in to NODATA"""
check_answer_nodata(*query(IPADDR, named_port, label, rdtype))
def check_answer_nodata(querymsg, answer):
assert querymsg.is_response(answer), str(answer)
assert answer.rcode() == dns.rcode.NOERROR, str(answer)
assert answer.answer == [], str(answer)
@given(label=binary(min_size=1, max_size=63))
def test_wildcard_match(label, named_port):
"""any label with maching rdtype must result in wildcard data in answer"""
check_answer_noerror(*query(IPADDR, named_port, label, WILDCARD_RDTYPE))
def check_answer_noerror(querymsg, answer):
assert querymsg.is_response(answer), str(answer)
assert answer.rcode() == dns.rcode.NOERROR, str(answer)
assert len(querymsg.question) == 1, str(answer)
isctest.check.is_response_to(response_msg, query_msg)
isctest.check.noerror(response_msg)
expected_answer = [
dns.rrset.from_text(
querymsg.question[0].name,
query_msg.question[0].name,
300, # TTL, ignored by dnspython comparison
dns.rdataclass.IN,
WILDCARD_RDTYPE,
WILDCARD_RDATA,
)
]
assert answer.answer == expected_answer, str(answer)
assert response_msg.answer == expected_answer, str(response_msg)
# Force the `*.*.allwild.test.` corner case to be checked.
@example(name=isctest.name.prepend_label("*", isctest.name.prepend_label("*", SUFFIX)))
@given(
name=dns_names(
suffix=isctest.name.prepend_label("*", SUFFIX), min_labels=len(SUFFIX) + 2
)
)
def test_wildcard_with_star_not_synthesized(
name: dns.name.Name, named_port: int
) -> None:
"""RFC 4592 section 2.2.1 ghost.*.example."""
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
isctest.check.nxdomain(response_msg)
isctest.check.empty_answer(query_msg)
NESTED_SUFFIX = dns.name.from_text("*.*.nestedwild.test.")
# Force `*.*.*.nestedwild.test.` to be checked.
@example(name=isctest.name.prepend_label("*", NESTED_SUFFIX))
@given(name=dns_names(suffix=NESTED_SUFFIX, min_labels=len(NESTED_SUFFIX) + 1))
def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None:
"""Check nested wildcard cases.
There are `*.nestedwild.test. A` and `*.*.*.nestedwild.test. A` records present in their zone.
This means that `foo.*.nestedwild.test. A` must not be synthetized (see test above)
but `foo.*.*.nestedwild.test A` must.
"""
# `*.*.*.nestedwild.test.` and `*.foo.*.*.nestedwild.test.` must be NOERROR
# `foo.*.*.*.nestedwild.test` must be NXDOMAIN (see test below).
assume(
len(name) == len(NESTED_SUFFIX) + 1
or name.labels[-len(NESTED_SUFFIX) - 1] != b"*"
)
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
isctest.check.noerror(response_msg)
expected_answer = [
dns.rrset.from_text(
query_msg.question[0].name,
300, # TTL, ignored by dnspython comparison
dns.rdataclass.IN,
WILDCARD_RDTYPE,
WILDCARD_RDATA,
)
]
assert response_msg.answer == expected_answer, str(response_msg)
@given(
name=dns_names(
suffix=isctest.name.prepend_label("*", NESTED_SUFFIX),
min_labels=len(NESTED_SUFFIX) + 2,
)
)
def test_name_nested_wildcard_subdomains_not_synthesized(
name: dns.name.Name, named_port: int
):
"""Check nested wildcard cases.
`foo.*.*.*.nestedwild.test. A` must not be synthesized.
"""
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
isctest.check.nxdomain(response_msg)
isctest.check.empty_answer(query_msg)