2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-30 14:07:59 +00:00

Add support for TSIG in isctest.kasp

For some kasp test we are going to need TSIG based queries to
differentiate between views.

(cherry picked from commit 9cb287afa0)
This commit is contained in:
Matthijs Mekking
2025-03-14 11:19:40 +01:00
parent e32d49e076
commit ada3b7852e

View File

@@ -21,6 +21,7 @@ from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta, timezone
import dns
import dns.tsig
import isctest.log
import isctest.query
@@ -29,8 +30,14 @@ DEFAULT_TTL = 300
NEXT_KEY_EVENT_THRESHOLD = 100
def _query(server, qname, qtype):
def _query(server, qname, qtype, tsig=None):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
if tsig is not None:
tsigkey = tsig.split(":")
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring)
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
except dns.exception.Timeout:
@@ -554,14 +561,14 @@ class Key:
return self.path
def check_zone_is_signed(server, zone):
def check_zone_is_signed(server, zone, tsig=None):
addr = server.ip
fqdn = f"{zone}."
# wait until zone is fully signed
signed = False
for _ in range(10):
response = _query(server, fqdn, dns.rdatatype.NSEC)
response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig)
if not isinstance(response, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} NSEC from {addr}")
elif response.rcode() != dns.rcode.NOERROR:
@@ -711,13 +718,13 @@ def check_keyrelationships(keys, expected):
assert key.is_metadata_consistent(status, expect.metadata)
def check_dnssec_verify(server, zone):
def check_dnssec_verify(server, zone, tsig=None):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."
verified = False
for _ in range(10):
transfer = _query(server, fqdn, dns.rdatatype.AXFR)
transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig)
if not isinstance(transfer, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
elif transfer.rcode() != dns.rcode.NOERROR:
@@ -936,8 +943,8 @@ def check_cds(rrset, keys):
assert numcds == len(cdss)
def _query_rrset(server, fqdn, qtype):
response = _query(server, fqdn, qtype)
def _query_rrset(server, fqdn, qtype, tsig=None):
response = _query(server, fqdn, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
rrs = []
@@ -957,46 +964,46 @@ def _query_rrset(server, fqdn, qtype):
return rrs, rrsigs
def check_apex(server, zone, ksks, zsks):
def check_apex(server, zone, ksks, zsks, tsig=None):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY)
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
assert len(dnskeys) > 0
check_dnskeys(dnskeys, ksks, zsks)
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA)
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY)
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS)
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
def check_subdomain(server, zone, ksks, zsks):
def check_subdomain(server, zone, ksks, zsks, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
qtype = dns.rdatatype.A
response = _query(server, qname, qtype)
response = _query(server, qname, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"