2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-29 21:47:59 +00:00

Use isctest.query.create across system tests

Rather than using the dnspython's facilities and defaults to create the
queries, use the isctest.query.create function in all the cases that
don't require special handling to have consistent defaults.

(cherry picked from commit 64143ea077c3ddb48f808af2d0b05e21209cd268)
This commit is contained in:
Evan Hunt 2025-07-29 16:02:32 -07:00
parent 9decbd88a3
commit 9dcfe0ee1a
27 changed files with 72 additions and 84 deletions

View File

@ -81,7 +81,7 @@ def has_signed_apex_nsec(zone, response):
def do_query(server, qname, qtype, tcp=False): def do_query(server, qname, qtype, tcp=False):
msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) msg = isctest.query.create(qname, qtype)
query_func = isctest.query.tcp if tcp else isctest.query.udp query_func = isctest.query.tcp if tcp else isctest.query.udp
response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR) response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
return response return response

View File

@ -84,7 +84,7 @@ def transfers_complete(servers):
) )
# pylint: disable=redefined-outer-name,unused-argument # pylint: disable=redefined-outer-name,unused-argument
def test_cipher_suites_tls_xfer(qname, ns, rcode, transfers_complete): def test_cipher_suites_tls_xfer(qname, ns, rcode, transfers_complete):
msg = dns.message.make_query(qname, "AXFR") msg = isctest.query.create(qname, "AXFR")
ans = isctest.query.tls(msg, f"10.53.0.{ns}") ans = isctest.query.tls(msg, f"10.53.0.{ns}")
assert ans.rcode() == rcode assert ans.rcode() == rcode
if rcode == dns.rcode.NOERROR: if rcode == dns.rcode.NOERROR:

View File

@ -9,13 +9,13 @@
# See the COPYRIGHT file distributed with this work for additional # See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership. # information regarding copyright ownership.
import isctest import dns
import dns.message import isctest
def test_database(ns1, templates): def test_database(ns1, templates):
msg = dns.message.make_query("database.", "SOA") msg = isctest.query.create("database.", "SOA")
# checking pre reload zone # checking pre reload zone
res = isctest.query.tcp(msg, "10.53.0.1") res = isctest.query.tcp(msg, "10.53.0.1")

View File

@ -18,9 +18,8 @@ import isctest
import isctest.mark import isctest.mark
import pytest import pytest
import dns.message
pytest.importorskip("dns", minversion="2.0.0") pytest.importorskip("dns", minversion="2.0.0")
import dns.rrset
pytestmark = [ pytestmark = [
isctest.mark.with_dnstap, isctest.mark.with_dnstap,
@ -49,7 +48,7 @@ def run_rndc(server, rndc_command):
def test_dnstap_dispatch_socket_addresses(): def test_dnstap_dispatch_socket_addresses():
# Send some query to ns3 so that it records something in its dnstap file. # Send some query to ns3 so that it records something in its dnstap file.
msg = dns.message.make_query("mail.example.", "A") msg = isctest.query.create("mail.example.", "A")
res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR) res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
assert res.answer == [ assert res.answer == [
dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2") dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")

View File

@ -20,11 +20,12 @@ import pytest
pytest.importorskip("dns") pytest.importorskip("dns")
import dns.exception import dns.exception
import dns.message
import dns.name import dns.name
import dns.rdataclass import dns.rdataclass
import dns.rdatatype import dns.rdatatype
import isctest
pytestmark = pytest.mark.extra_artifacts( pytestmark = pytest.mark.extra_artifacts(
[ [
"gnutls-cli.*", "gnutls-cli.*",
@ -35,7 +36,7 @@ pytestmark = pytest.mark.extra_artifacts(
def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport): def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
# Prepare the example/SOA query which will be sent over TLS. # Prepare the example/SOA query which will be sent over TLS.
query = dns.message.make_query("example.", dns.rdatatype.SOA) query = isctest.query.create("example.", dns.rdatatype.SOA)
query_wire = query.to_wire() query_wire = query.to_wire()
query_with_length = struct.pack(">H", len(query_wire)) + query_wire query_with_length = struct.pack(">H", len(query_wire)) + query_wire

View File

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional # See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership. # information regarding copyright ownership.
import dns.message import dns.flags
import pytest import pytest
import isctest import isctest
@ -29,7 +29,7 @@ pytestmark = pytest.mark.extra_artifacts(
def test_dsdigest_good(): def test_dsdigest_good():
"""Check that validation with enabled digest types works""" """Check that validation with enabled digest types works"""
msg = dns.message.make_query("a.good.", "A", want_dnssec=True) msg = isctest.query.create("a.good.", "A")
res = isctest.query.tcp( res = isctest.query.tcp(
msg, msg,
"10.53.0.3", "10.53.0.3",
@ -51,7 +51,7 @@ def test_dsdigest_bad():
def test_dsdigest_insecure(): def test_dsdigest_insecure():
"""Check that validation with not supported digest algorithms is insecure""" """Check that validation with not supported digest algorithms is insecure"""
msg_ds = dns.message.make_query("bad.", "DS", want_dnssec=True) msg_ds = isctest.query.create("bad.", "DS")
res_ds = isctest.query.tcp( res_ds = isctest.query.tcp(
msg_ds, msg_ds,
"10.53.0.4", "10.53.0.4",
@ -59,7 +59,7 @@ def test_dsdigest_insecure():
isctest.check.noerror(res_ds) isctest.check.noerror(res_ds)
assert res_ds.flags & dns.flags.AD assert res_ds.flags & dns.flags.AD
msg_a = dns.message.make_query("a.bad.", "A", want_dnssec=True) msg_a = isctest.query.create("a.bad.", "A")
res_a = isctest.query.tcp( res_a = isctest.query.tcp(
msg_a, msg_a,
"10.53.0.4", "10.53.0.4",

View File

@ -12,7 +12,8 @@
import os import os
import pytest import pytest
import dns.message import dns.flags
import isctest import isctest
@ -29,8 +30,7 @@ pytestmark = pytest.mark.extra_artifacts(
def check_server_soa(resolver): def check_server_soa(resolver):
msg = dns.message.make_query(".", "SOA") msg = isctest.query.create(".", "SOA")
msg.flags += dns.flags.AD
res1 = isctest.query.tcp(msg, "10.53.0.1") res1 = isctest.query.tcp(msg, "10.53.0.1")
res2 = isctest.query.tcp(msg, resolver) res2 = isctest.query.tcp(msg, resolver)
isctest.check.rrsets_equal(res1.answer, res2.answer) isctest.check.rrsets_equal(res1.answer, res2.answer)

View File

@ -9,8 +9,6 @@
# See the COPYRIGHT file distributed with this work for additional # See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership. # information regarding copyright ownership.
import dns.message
import isctest import isctest
@ -21,11 +19,11 @@ def test_emptyzones(ns1, templates):
watcher.wait_for_line("all zones loaded") watcher.wait_for_line("all zones loaded")
templates.render("ns1/named.conf", {"automatic_empty_zones": True}) templates.render("ns1/named.conf", {"automatic_empty_zones": True})
ns1.rndc("reload") ns1.rndc("reload")
msg = dns.message.make_query("version.bind", "TXT", "CH") msg = isctest.query.create("version.bind", "TXT", "CH")
res = isctest.query.tcp(msg, "10.53.0.1") res = isctest.query.tcp(msg, "10.53.0.1")
isctest.check.noerror(res) isctest.check.noerror(res)
# check that allow-transfer { none; } works # check that allow-transfer { none; } works
msg = dns.message.make_query("10.in-addr.arpa", "AXFR") msg = isctest.query.create("10.in-addr.arpa", "AXFR")
res = isctest.query.tcp(msg, "10.53.0.1") res = isctest.query.tcp(msg, "10.53.0.1")
isctest.check.refused(res) isctest.check.refused(res)

View File

@ -10,6 +10,7 @@
# information regarding copyright ownership. # information regarding copyright ownership.
import dns.flags
import dns.message import dns.message
import pytest import pytest
@ -29,7 +30,7 @@ pytestmark = pytest.mark.extra_artifacts(
def test_glue_full_glue_set(): def test_glue_full_glue_set():
"""test that a ccTLD referral gets a full glue set from the root zone""" """test that a ccTLD referral gets a full glue set from the root zone"""
msg = dns.message.make_query("foo.bar.fi", "A") msg = isctest.query.create("foo.bar.fi", "A")
msg.flags &= ~dns.flags.RD msg.flags &= ~dns.flags.RD
res = isctest.query.udp(msg, "10.53.0.1") res = isctest.query.udp(msg, "10.53.0.1")
@ -60,7 +61,7 @@ NS.UU.NET. 172800 IN A 137.39.1.3
def test_glue_no_glue_set(): def test_glue_no_glue_set():
"""test that out-of-zone glue is not found""" """test that out-of-zone glue is not found"""
msg = dns.message.make_query("example.net.", "A") msg = isctest.query.create("example.net.", "A")
msg.flags &= ~dns.flags.RD msg.flags &= ~dns.flags.RD
res = isctest.query.udp(msg, "10.53.0.1") res = isctest.query.udp(msg, "10.53.0.1")

View File

@ -9,15 +9,11 @@
# See the COPYRIGHT file distributed with this work for additional # See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership. # information regarding copyright ownership.
import pytest
import isctest import isctest
pytest.importorskip("dns")
import dns.message
def test_async_hook(): def test_async_hook():
msg = dns.message.make_query("example.com.", "A") msg = isctest.query.create("example.com.", "A")
res = isctest.query.udp(msg, "10.53.0.1") res = isctest.query.udp(msg, "10.53.0.1")
# the test-async plugin changes the status of any positive answer to NOTIMP # the test-async plugin changes the status of any positive answer to NOTIMP
isctest.check.notimp(res) isctest.check.notimp(res)

View File

@ -11,10 +11,10 @@
import os import os
import isctest import dns.rrset
import pytest import pytest
import dns.message import isctest
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -26,7 +26,7 @@ import dns.message
], ],
) )
def test_include_multiplecfg(qname): def test_include_multiplecfg(qname):
msg = dns.message.make_query(qname, "A") msg = isctest.query.create(qname, "A")
res = isctest.query.tcp(msg, "10.53.0.2") res = isctest.query.tcp(msg, "10.53.0.2")
isctest.check.noerror(res) isctest.check.noerror(res)

View File

@ -14,8 +14,8 @@ import shutil
from typing import Optional from typing import Optional
import dns.flags import dns.flags
import dns.rcode
import dns.message import dns.message
import dns.rcode
import dns.zone import dns.zone
import isctest.log import isctest.log
@ -155,7 +155,7 @@ def is_executable(cmd: str, errmsg: str) -> None:
def named_alive(named_proc, resolver_ip): def named_alive(named_proc, resolver_ip):
assert named_proc.poll() is None, "named isn't running" assert named_proc.poll() is None, "named isn't running"
msg = dns.message.make_query("version.bind", "TXT", "CH") msg = isctest.query.create("version.bind", "TXT", "CH")
isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR) isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR)

View File

@ -22,10 +22,10 @@ from typing import Dict, List, Optional, Tuple, Union
import dns import dns
import dns.tsig import dns.tsig
from isctest.instance import NamedInstance
import isctest.log import isctest.log
import isctest.query import isctest.query
import isctest.util import isctest.util
from isctest.instance import NamedInstance
from isctest.vars.algorithms import Algorithm, ALL_ALGORITHMS_BY_NUM from isctest.vars.algorithms import Algorithm, ALL_ALGORITHMS_BY_NUM
DEFAULT_TTL = 300 DEFAULT_TTL = 300
@ -34,7 +34,7 @@ NEXT_KEY_EVENT_THRESHOLD = 100
def _query(server, qname, qtype, tsig=None): def _query(server, qname, qtype, tsig=None):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) query = isctest.query.create(qname, qtype)
if tsig is not None: if tsig is not None:
tsigkey = tsig.split(":") tsigkey = tsig.split(":")

View File

@ -805,7 +805,7 @@ def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, ns4):
qname = f"view.{zone}." qname = f"view.{zone}."
qtype = dns.rdatatype.TXT qtype = dns.rdatatype.TXT
rdata = txt_rdata rdata = txt_rdata
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) query = isctest.query.create(qname, qtype)
tsigkey = tsig.split(":") tsigkey = tsig.split(":")
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0]) keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring) query.use_tsig(keyring)
@ -1612,7 +1612,7 @@ def test_kasp_reload_restart(ns6):
def query_soa(qname): def query_soa(qname):
fqdn = dns.name.from_text(qname) fqdn = dns.name.from_text(qname)
qtype = dns.rdatatype.SOA qtype = dns.rdatatype.SOA
query = dns.message.make_query(fqdn, qtype, use_edns=True, want_dnssec=True) query = isctest.query.create(fqdn, qtype)
try: try:
response = isctest.query.tcp(query, ns6.ip, ns6.ports.dns, timeout=3) response = isctest.query.tcp(query, ns6.ip, ns6.ports.dns, timeout=3)
except dns.exception.Timeout: except dns.exception.Timeout:

View File

@ -14,11 +14,10 @@ import itertools
import isctest import isctest
import pytest import pytest
import dns.message
# Everything from getting a big answer to creating an RR set with thousands # Everything from getting a big answer to creating an RR set with thousands
# of records takes minutes of CPU and real time with dnspython < 2.0.0. # of records takes minutes of CPU and real time with dnspython < 2.0.0.
pytest.importorskip("dns", minversion="2.0.0") pytest.importorskip("dns", minversion="2.0.0")
import dns.rrset
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -32,7 +31,7 @@ pytest.importorskip("dns", minversion="2.0.0")
], ],
) )
def test_limits(name, limit): def test_limits(name, limit):
msg_query = dns.message.make_query(f"{name}.example.", "A") msg_query = isctest.query.create(f"{name}.example.", "A")
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False) res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
iplist = [ iplist = [
@ -46,7 +45,7 @@ def test_limits(name, limit):
def test_limit_exceeded(): def test_limit_exceeded():
msg_query = dns.message.make_query("5000.example.", "A") msg_query = isctest.query.create("5000.example.", "A")
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False) res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
assert res.flags & dns.flags.TC, "TC flag was not set" assert res.flags & dns.flags.TC, "TC flag was not set"

View File

@ -19,7 +19,7 @@ import isctest
def test_masterfile_include_semantics(): def test_masterfile_include_semantics():
"""Test master file $INCLUDE semantics""" """Test master file $INCLUDE semantics"""
msg_axfr = dns.message.make_query("include.", "AXFR") msg_axfr = isctest.query.create("include.", "AXFR")
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1") res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
axfr_include_semantics = """;ANSWER axfr_include_semantics = """;ANSWER
include. 300 IN SOA ns.include. hostmaster.include. 1 3600 1800 1814400 3600 include. 300 IN SOA ns.include. hostmaster.include. 1 3600 1800 1814400 3600
@ -40,7 +40,7 @@ ns.include. 300 IN A 127.0.0.1
def test_masterfile_bind_8_compat_semantics(): def test_masterfile_bind_8_compat_semantics():
"""Test master file BIND 8 TTL and $TTL semantics compatibility""" """Test master file BIND 8 TTL and $TTL semantics compatibility"""
msg_axfr = dns.message.make_query("ttl1.", "AXFR") msg_axfr = isctest.query.create("ttl1.", "AXFR")
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1") res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
axfr_ttl_semantics = """;ANSWER axfr_ttl_semantics = """;ANSWER
ttl1. 3 IN SOA ns.ttl1. hostmaster.ttl1. 1 3600 1800 1814400 3 ttl1. 3 IN SOA ns.ttl1. hostmaster.ttl1. 1 3600 1800 1814400 3
@ -59,7 +59,7 @@ ns.ttl1. 3 IN A 10.53.0.1
def test_masterfile_rfc_1035_semantics(): def test_masterfile_rfc_1035_semantics():
"""Test master file RFC1035 TTL and $TTL semantics""" """Test master file RFC1035 TTL and $TTL semantics"""
msg_axfr = dns.message.make_query("ttl2.", "AXFR") msg_axfr = isctest.query.create("ttl2.", "AXFR")
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1") res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
axfr_ttl_semantics = """;ANSWER axfr_ttl_semantics = """;ANSWER
ttl2. 1 IN SOA ns.ttl2. hostmaster.ttl2. 1 3600 1800 1814400 3 ttl2. 1 IN SOA ns.ttl2. hostmaster.ttl2. 1 3600 1800 1814400 3
@ -78,7 +78,7 @@ ns.ttl2. 1 IN A 10.53.0.1
def test_masterfile_missing_master_file(): def test_masterfile_missing_master_file():
"""Test nameserver running with a missing master file""" """Test nameserver running with a missing master file"""
msg_soa = dns.message.make_query("example.", "SOA") msg_soa = isctest.query.create("example.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2") res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
expected_soa_rr = """;ANSWER expected_soa_rr = """;ANSWER
example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600 example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
@ -89,7 +89,7 @@ example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
def test_masterfile_missing_master_file_servfail(): def test_masterfile_missing_master_file_servfail():
"""Test nameserver returning SERVFAIL for a missing master file""" """Test nameserver returning SERVFAIL for a missing master file"""
msg_soa = dns.message.make_query("missing.", "SOA") msg_soa = isctest.query.create("missing.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2") res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
isctest.check.servfail(res_soa) isctest.check.servfail(res_soa)

View File

@ -13,8 +13,6 @@ import pytest
pytest.importorskip("dns", minversion="2.7.0") pytest.importorskip("dns", minversion="2.7.0")
import dns.message
import isctest import isctest
@ -22,7 +20,7 @@ import isctest
# about twice as large as the answer with compression enabled, while # about twice as large as the answer with compression enabled, while
# maintaining identical content. # maintaining identical content.
def test_names(): def test_names():
msg = dns.message.make_query("example.", "MX") msg = isctest.query.create("example.", "MX")
# Getting message size with compression enabled # Getting message size with compression enabled
res_enabled = isctest.query.tcp(msg, ip="10.53.0.1", source="10.53.0.1") res_enabled = isctest.query.tcp(msg, ip="10.53.0.1", source="10.53.0.1")
# Getting message size with compression disabled # Getting message size with compression disabled

View File

@ -17,8 +17,6 @@ import isctest
import isctest.mark import isctest.mark
import isctest.run import isctest.run
import dns.message
pytestmark = [ pytestmark = [
isctest.mark.with_lmdb, isctest.mark.with_lmdb,
pytest.mark.extra_artifacts( pytest.mark.extra_artifacts(
@ -29,7 +27,7 @@ pytestmark = [
def test_nzd2nzf(ns1): def test_nzd2nzf(ns1):
zone_data = '"added.example" { type primary; file "added.db"; };' zone_data = '"added.example" { type primary; file "added.db"; };'
msg = dns.message.make_query("a.added.example.", "A") msg = isctest.query.create("a.added.example.", "A")
# query for non-existing zone data # query for non-existing zone data
res = isctest.query.tcp(msg, ns1.ip) res = isctest.query.tcp(msg, ns1.ip)

View File

@ -12,10 +12,9 @@
# information regarding copyright ownership. # information regarding copyright ownership.
import pytest import pytest
import isctest import isctest
pytest.importorskip("dns")
import dns.message
pytestmark = pytest.mark.extra_artifacts( pytestmark = pytest.mark.extra_artifacts(
[ [
@ -26,7 +25,7 @@ pytestmark = pytest.mark.extra_artifacts(
def test_querysource_none(): def test_querysource_none():
msg = dns.message.make_query("example.", "A", want_dnssec=False) msg = isctest.query.create("example.", "A", dnssec=False)
res = isctest.query.udp(msg, "10.53.0.2") res = isctest.query.udp(msg, "10.53.0.2")
isctest.check.noerror(res) isctest.check.noerror(res)
@ -43,7 +42,7 @@ def test_querysource_none():
# using a different name below to make sure we don't use the # using a different name below to make sure we don't use the
# resolver cache # resolver cache
msg = dns.message.make_query("exampletwo.", "A", want_dnssec=False) msg = isctest.query.create("exampletwo.", "A", dnssec=False)
res = isctest.query.udp(msg, "fd92:7065:b8e:ffff::2") res = isctest.query.udp(msg, "fd92:7065:b8e:ffff::2")
isctest.check.noerror(res) isctest.check.noerror(res)

View File

@ -15,10 +15,9 @@ import socket
import time import time
import pytest import pytest
import isctest import isctest
pytest.importorskip("dns")
import dns.message
pytestmark = pytest.mark.extra_artifacts( pytestmark = pytest.mark.extra_artifacts(
[ [
@ -66,6 +65,6 @@ def test_cve_2023_3341(control_port):
# Wait for named to (possibly) crash # Wait for named to (possibly) crash
time.sleep(10) time.sleep(10)
msg = dns.message.make_query("version.bind", "TXT", "CH") msg = isctest.query.create("version.bind", "TXT", "CH")
res = isctest.query.udp(msg, "10.53.0.2") res = isctest.query.udp(msg, "10.53.0.2")
isctest.check.noerror(res) isctest.check.noerror(res)

View File

@ -12,13 +12,16 @@
# information regarding copyright ownership. # information regarding copyright ownership.
import os import os
import pytest import pytest
pytest.importorskip("dns", minversion="2.0.0") pytest.importorskip("dns", minversion="2.0.0")
import dns.rcode
import dns.rrset
import isctest import isctest
from isctest.compat import dns_rcode from isctest.compat import dns_rcode
import dns.message
pytestmark = pytest.mark.extra_artifacts( pytestmark = pytest.mark.extra_artifacts(
[ [
@ -70,7 +73,7 @@ pytestmark = pytest.mark.extra_artifacts(
) )
def test_rpz_multiple_views(qname, source, rcode): def test_rpz_multiple_views(qname, source, rcode):
# Wait for the rpz-external.local zone transfer # Wait for the rpz-external.local zone transfer
msg = dns.message.make_query("rpz-external.local", "SOA") msg = isctest.query.create("rpz-external.local", "SOA")
isctest.query.tcp( isctest.query.tcp(
msg, msg,
ip="10.53.0.3", ip="10.53.0.3",
@ -84,7 +87,7 @@ def test_rpz_multiple_views(qname, source, rcode):
expected_rcode=dns_rcode.NOERROR, expected_rcode=dns_rcode.NOERROR,
) )
msg = dns.message.make_query(qname, "A") msg = isctest.query.create(qname, "A")
res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode) res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode)
if rcode == dns.rcode.NOERROR: if rcode == dns.rcode.NOERROR:
assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")] assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
@ -94,7 +97,7 @@ def test_rpz_passthru_logging():
resolver_ip = "10.53.0.3" resolver_ip = "10.53.0.3"
# Should generate a log entry into rpz_passthru.txt # Should generate a log entry into rpz_passthru.txt
msg_allowed = dns.message.make_query("allowed.", "A") msg_allowed = isctest.query.create("allowed.", "A")
res_allowed = isctest.query.udp( res_allowed = isctest.query.udp(
msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR
) )
@ -103,7 +106,7 @@ def test_rpz_passthru_logging():
] ]
# Should also generate a log entry into rpz_passthru.txt # Should also generate a log entry into rpz_passthru.txt
msg_allowed_any = dns.message.make_query("allowed.", "ANY") msg_allowed_any = isctest.query.create("allowed.", "ANY")
res_allowed_any = isctest.query.udp( res_allowed_any = isctest.query.udp(
msg_allowed_any, msg_allowed_any,
resolver_ip, resolver_ip,
@ -121,7 +124,7 @@ def test_rpz_passthru_logging():
# baddomain.com isn't allowed (CNAME .), should return NXDOMAIN # baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# Should generate a log entry into rpz.txt # Should generate a log entry into rpz.txt
msg_not_allowed = dns.message.make_query("baddomain.", "A") msg_not_allowed = isctest.query.create("baddomain.", "A")
res_not_allowed = isctest.query.udp( res_not_allowed = isctest.query.udp(
msg_not_allowed, msg_not_allowed,
resolver_ip, resolver_ip,

View File

@ -105,7 +105,7 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries
) )
qname = relname + ".test" qname = relname + ".test"
msg = dns.message.make_query(qname, "A") msg = isctest.query.create(qname, "A")
futures[ futures[
executor.submit( executor.submit(
isctest.query.udp, msg, resolver_ip, timeout=1, attempts=1 isctest.query.udp, msg, resolver_ip, timeout=1, attempts=1

View File

@ -32,14 +32,14 @@ def test_stub_zones_availability(ns3):
# try an AXFR that should be denied (NOTAUTH) # try an AXFR that should be denied (NOTAUTH)
def axfr_denied(): def axfr_denied():
msg = dns.message.make_query("child.example.", "AXFR") msg = isctest.query.create("child.example.", "AXFR")
res = isctest.query.tcp(msg, "10.53.0.3") res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.notauth(res) isctest.check.notauth(res)
# look for stub zone data without recursion (should not be found) # look for stub zone data without recursion (should not be found)
def stub_zone_lookout_without_recursion(): def stub_zone_lookout_without_recursion():
# drop all flags (dns.flags.RD is set by default) # drop all flags (dns.flags.RD is set by default)
msg = dns.message.make_query("data.child.example.", "TXT") msg = isctest.query.create("data.child.example.", "TXT")
msg.flags = 0 msg.flags = 0
res = isctest.query.tcp(msg, "10.53.0.3") res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res) isctest.check.noerror(res)
@ -54,7 +54,7 @@ def test_stub_zones_availability(ns3):
# look for stub zone data with recursion (should be found) # look for stub zone data with recursion (should be found)
def stub_zone_lookout_with_recursion(): def stub_zone_lookout_with_recursion():
# dns.flags.RD is set by default # dns.flags.RD is set by default
msg = dns.message.make_query("data.child.example.", "TXT") msg = isctest.query.create("data.child.example.", "TXT")
res = isctest.query.tcp(msg, "10.53.0.3") res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res) isctest.check.noerror(res)
assert res.answer[0] == dns.rrset.from_text( assert res.answer[0] == dns.rrset.from_text(
@ -79,7 +79,7 @@ def test_stub_glue_record_with_minimal_response():
assert os.path.exists("ns5/example.db") assert os.path.exists("ns5/example.db")
# this query would fail if NS glue wasn't transferred # this query would fail if NS glue wasn't transferred
msg_txt = dns.message.make_query("target.example.", "TXT", want_dnssec=False) msg_txt = isctest.query.create("target.example.", "TXT", dnssec=False)
res_txt = isctest.query.tcp(msg_txt, "10.53.0.5") res_txt = isctest.query.tcp(msg_txt, "10.53.0.5")
isctest.check.noerror(res_txt) isctest.check.noerror(res_txt)
assert res_txt.answer[0] == dns.rrset.from_text( assert res_txt.answer[0] == dns.rrset.from_text(
@ -87,13 +87,13 @@ def test_stub_glue_record_with_minimal_response():
) )
# ensure both IPv4 and IPv6 glue records were transferred # ensure both IPv4 and IPv6 glue records were transferred
msg_a = dns.message.make_query("ns4.example.", "A") msg_a = isctest.query.create("ns4.example.", "A")
res_a = isctest.query.tcp(msg_a, "10.53.0.5") res_a = isctest.query.tcp(msg_a, "10.53.0.5")
assert res_a.answer[0] == dns.rrset.from_text( assert res_a.answer[0] == dns.rrset.from_text(
"ns4.example.", "300", "IN", "A", "10.53.0.4" "ns4.example.", "300", "IN", "A", "10.53.0.4"
) )
msg_aaaa = dns.message.make_query("ns4.example.", "AAAA") msg_aaaa = isctest.query.create("ns4.example.", "AAAA")
res_aaaa = isctest.query.tcp(msg_aaaa, "10.53.0.5") res_aaaa = isctest.query.tcp(msg_aaaa, "10.53.0.5")
assert res_aaaa.answer[0] == dns.rrset.from_text( assert res_aaaa.answer[0] == dns.rrset.from_text(
"ns4.example.", "300", "IN", "AAAA", "fd92:7065:b8e:ffff::4" "ns4.example.", "300", "IN", "AAAA", "fd92:7065:b8e:ffff::4"

View File

@ -56,7 +56,7 @@ class CraftedTKEYQuery:
rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata) rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata)
# Prepare complete TKEY query to send # Prepare complete TKEY query to send
self.msg = dns.message.make_query( self.msg = isctest.query.create(
dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY
) )
self.msg.additional.append(rrset) self.msg.additional.append(rrset)

View File

@ -13,9 +13,6 @@ import pytest
import isctest import isctest
pytest.importorskip("dns")
import dns.message
@pytest.mark.parametrize( @pytest.mark.parametrize(
"qname,rdtype,expected_ttl", "qname,rdtype,expected_ttl",
@ -27,7 +24,7 @@ import dns.message
], ],
) )
def test_cache_ttl(qname, rdtype, expected_ttl): def test_cache_ttl(qname, rdtype, expected_ttl):
msg = dns.message.make_query(qname, rdtype) msg = isctest.query.create(qname, rdtype)
response = isctest.query.udp(msg, "10.53.0.2") response = isctest.query.udp(msg, "10.53.0.2")
for rr in response.answer + response.authority: for rr in response.answer + response.authority:
assert rr.ttl == expected_ttl assert rr.ttl == expected_ttl

View File

@ -94,7 +94,7 @@ def test_wildcard_rdtype_mismatch(
# See RFC 4592 section 2.2.1. # See RFC 4592 section 2.2.1.
assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*") assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*")
query_msg = dns.message.make_query(name, rdtype) query_msg = isctest.query.create(name, rdtype)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg) isctest.check.is_response_to(response_msg, query_msg)
@ -111,7 +111,7 @@ def test_wildcard_match(name: dns.name.Name, named_port: int) -> None:
# See RFC 4592 section 2.2.1. # See RFC 4592 section 2.2.1.
assume(name.labels[-len(SUFFIX) - 1] != b"*") assume(name.labels[-len(SUFFIX) - 1] != b"*")
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg) isctest.check.is_response_to(response_msg, query_msg)
@ -140,7 +140,7 @@ def test_wildcard_with_star_not_synthesized(
name: dns.name.Name, named_port: int name: dns.name.Name, named_port: int
) -> None: ) -> None:
"""RFC 4592 section 2.2.1 ghost.*.example.""" """RFC 4592 section 2.2.1 ghost.*.example."""
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg) isctest.check.is_response_to(response_msg, query_msg)
@ -170,7 +170,7 @@ def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None
or name.labels[-len(NESTED_SUFFIX) - 1] != b"*" or name.labels[-len(NESTED_SUFFIX) - 1] != b"*"
) )
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg) isctest.check.is_response_to(response_msg, query_msg)
@ -201,7 +201,7 @@ def test_name_nested_wildcard_subdomains_not_synthesized(
`foo.*.*.*.nestedwild.test. A` must not be synthesized. `foo.*.*.*.nestedwild.test. A` must not be synthesized.
""" """
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg) isctest.check.is_response_to(response_msg, query_msg)

View File

@ -60,8 +60,8 @@ def test_xferquota(named_port, ns1, ns2):
isctest.run.retry_with_timeout(check_line_count, timeout=360) isctest.run.retry_with_timeout(check_line_count, timeout=360)
axfr_msg = dns.message.make_query("zone000099.example.", "AXFR") axfr_msg = isctest.query.create("zone000099.example.", "AXFR")
a_msg = dns.message.make_query("a.changing.", "A") a_msg = isctest.query.create("a.changing.", "A")
def query_and_compare(msg): def query_and_compare(msg):
ns1response = isctest.query.tcp(msg, "10.53.0.1") ns1response = isctest.query.tcp(msg, "10.53.0.1")