diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index fa3c297f78..7c05206d0f 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -21,6 +21,7 @@ from typing import Dict, List, Optional, Union from datetime import datetime, timedelta, timezone import dns +import dns.tsig import isctest.log import isctest.query @@ -29,8 +30,14 @@ DEFAULT_TTL = 300 NEXT_KEY_EVENT_THRESHOLD = 100 -def _query(server, qname, qtype): +def _query(server, qname, qtype, tsig=None): query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) + + if tsig is not None: + tsigkey = tsig.split(":") + keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0]) + query.use_tsig(keyring) + try: response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3) except dns.exception.Timeout: @@ -554,14 +561,14 @@ class Key: return self.path -def check_zone_is_signed(server, zone): +def check_zone_is_signed(server, zone, tsig=None): addr = server.ip fqdn = f"{zone}." # wait until zone is fully signed signed = False for _ in range(10): - response = _query(server, fqdn, dns.rdatatype.NSEC) + response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig) if not isinstance(response, dns.message.Message): isctest.log.debug(f"no response for {fqdn} NSEC from {addr}") elif response.rcode() != dns.rcode.NOERROR: @@ -711,13 +718,13 @@ def check_keyrelationships(keys, expected): assert key.is_metadata_consistent(status, expect.metadata) -def check_dnssec_verify(server, zone): +def check_dnssec_verify(server, zone, tsig=None): # Check if zone if DNSSEC valid with dnssec-verify. fqdn = f"{zone}." verified = False for _ in range(10): - transfer = _query(server, fqdn, dns.rdatatype.AXFR) + transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig) if not isinstance(transfer, dns.message.Message): isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}") elif transfer.rcode() != dns.rcode.NOERROR: @@ -936,8 +943,8 @@ def check_cds(rrset, keys): assert numcds == len(cdss) -def _query_rrset(server, fqdn, qtype): - response = _query(server, fqdn, qtype) +def _query_rrset(server, fqdn, qtype, tsig=None): + response = _query(server, fqdn, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR rrs = [] @@ -957,46 +964,46 @@ def _query_rrset(server, fqdn, qtype): return rrs, rrsigs -def check_apex(server, zone, ksks, zsks): +def check_apex(server, zone, ksks, zsks, tsig=None): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." # test dnskey query - dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY) + dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) assert len(dnskeys) > 0 check_dnskeys(dnskeys, ksks, zsks) assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks) # test soa query - soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA) + soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks) # test cdnskey query - cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY) + cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig) check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) if len(cdnskeys) > 0: assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks) # test cds query - cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS) + cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig) check_cds(cds, ksks) if len(cds) > 0: assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks) -def check_subdomain(server, zone, ksks, zsks): +def check_subdomain(server, zone, ksks, zsks, tsig=None): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." qtype = dns.rdatatype.A - response = _query(server, qname, qtype) + response = _query(server, qname, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"