diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 0ea1c774ea..5b39b9fffd 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -332,6 +332,52 @@ class Key: ) return value + def get_signing_state(self, offline_ksk=False, zsk_missing=False) -> (bool, bool): + """ + This returns the signing state derived from the key states, KRRSIGState + and ZRRSIGState. + + If 'offline_ksk' is set to True, we determine the signing state from + the timing metadata. If 'zsigning' is True, ensure the current time is + between the Active and Retired timing metadata. + + If 'zsk_missing' is set to True, it means the ZSK private key file is + missing, and the KSK should take over signing the RRset, and the + expected zone signing state (zsigning) is reversed. + """ + # Fetch key timing metadata. + now = KeyTimingMetadata.now() + activate = self.get_timing("Activate") + inactive = self.get_timing("Inactive", must_exist=False) + + active = now >= activate + retired = inactive is not None and inactive <= now + signing = active and not retired + + # Fetch key state metadata. + krrsigstate = self.get_metadata("KRRSIGState", must_exist=False) + ksigning = krrsigstate in ["rumoured", "omnipresent"] + zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False) + zsigning = zrrsigstate in ["rumoured", "omnipresent"] + + if ksigning: + assert self.is_ksk() + if zsigning: + assert self.is_zsk() + + # If the ZSK private key file is missing, revers the zone signing state. + if zsk_missing: + zsigning = not zsigning + + # If testing offline KSK, retrieve the signing state from the key timing + # metadata. + if offline_ksk and signing and self.is_zsk(): + assert zsigning + if offline_ksk and signing and self.is_ksk(): + ksigning = signing + + return ksigning, zsigning + def ttl(self) -> int: with open(self.keyfile, "r", encoding="utf-8") as file: for line in file: @@ -772,8 +818,9 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None): assert f"key: {key.tag}" in response -def _check_signatures(signatures, covers, fqdn, keys): - now = KeyTimingMetadata.now() +def _check_signatures( + signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False +): numsigs = 0 zrrsig = True if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]: @@ -781,23 +828,16 @@ def _check_signatures(signatures, covers, fqdn, keys): krrsig = not zrrsig for key in keys: - activate = key.get_timing("Activate") - inactive = key.get_timing("Inactive", must_exist=False) + ksigning, zsigning = key.get_signing_state( + offline_ksk=offline_ksk, zsk_missing=zsk_missing + ) - active = now >= activate - retired = inactive is not None and inactive <= now - signing = active and not retired alg = key.get_metadata("Algorithm") rtype = dns.rdatatype.to_text(covers) expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}" - if not signing: - for rrsig in signatures: - assert re.search(expect, rrsig) is None - continue - - if zrrsig and key.is_zsk(): + if zrrsig and zsigning: has_rrsig = False for rrsig in signatures: if re.search(expect, rrsig) is not None: @@ -806,11 +846,11 @@ def _check_signatures(signatures, covers, fqdn, keys): assert has_rrsig, f"Expected signature but not found: {expect}" numsigs += 1 - if zrrsig and not key.is_zsk(): + if zrrsig and not zsigning: for rrsig in signatures: assert re.search(expect, rrsig) is None - if krrsig and key.is_ksk(): + if krrsig and ksigning: has_rrsig = False for rrsig in signatures: if re.search(expect, rrsig) is not None: @@ -819,14 +859,16 @@ def _check_signatures(signatures, covers, fqdn, keys): assert has_rrsig, f"Expected signature but not found: {expect}" numsigs += 1 - if krrsig and not key.is_ksk(): + if krrsig and not ksigning: for rrsig in signatures: assert re.search(expect, rrsig) is None return numsigs -def check_signatures(rrset, covers, fqdn, ksks, zsks): +def check_signatures( + rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False +): # Check if signatures with covering type are signed with the right keys. # The right keys are the ones that expect a signature and have the # correct role. @@ -840,8 +882,12 @@ def check_signatures(rrset, covers, fqdn, ksks, zsks): rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" signatures.append(rrsig) - numsigs += _check_signatures(signatures, covers, fqdn, ksks) - numsigs += _check_signatures(signatures, covers, fqdn, zsks) + numsigs += _check_signatures( + signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing + ) + numsigs += _check_signatures( + signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing + ) assert numsigs == len(signatures) @@ -965,7 +1011,9 @@ def _query_rrset(server, fqdn, qtype, tsig=None): return rrs, rrsigs -def check_apex(server, zone, ksks, zsks, tsig=None): +def check_apex( + server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None +): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." @@ -973,30 +1021,44 @@ def check_apex(server, zone, ksks, zsks, tsig=None): # test dnskey query dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) check_dnskeys(dnskeys, ksks, zsks) - check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks) + check_signatures( + rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk + ) # test soa query soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() - check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks) + check_signatures( + rrsigs, + dns.rdatatype.SOA, + fqdn, + ksks, + zsks, + offline_ksk=offline_ksk, + zsk_missing=zsk_missing, + ) # test cdnskey query cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig) check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) if len(cdnskeys) > 0: assert len(rrsigs) > 0 - check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks) + check_signatures( + rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk + ) # test cds query cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig) check_cds(cds, ksks) if len(cds) > 0: assert len(rrsigs) > 0 - check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks) + check_signatures( + rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk + ) -def check_subdomain(server, zone, ksks, zsks, tsig=None): +def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." @@ -1014,7 +1076,7 @@ def check_subdomain(server, zone, ksks, zsks, tsig=None): else: assert match in rrset.to_text() - check_signatures(rrsigs, qtype, fqdn, ksks, zsks) + check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk) def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None): diff --git a/bin/tests/system/ksr/tests_ksr.py b/bin/tests/system/ksr/tests_ksr.py index 5512f34fa2..ae020086f9 100644 --- a/bin/tests/system/ksr/tests_ksr.py +++ b/bin/tests/system/ksr/tests_ksr.py @@ -673,9 +673,9 @@ def test_ksr_common(servers): # - check keys check_keys(overlapping_zsks, lifetime, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks) + isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True) def test_ksr_lastbundle(servers): @@ -748,9 +748,9 @@ def test_ksr_lastbundle(servers): # - check keys check_keys(zsks, lifetime, offset=offset, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) # check that last bundle warning is logged warning = "last bundle in skr, please import new skr file" @@ -828,9 +828,9 @@ def test_ksr_inthemiddle(servers): # - check keys check_keys(zsks, lifetime, offset=offset, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) # check that no last bundle warning is logged warning = "last bundle in skr, please import new skr file" @@ -1023,9 +1023,9 @@ def test_ksr_unlimited(servers): # - check keys check_keys(zsks, lifetime, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_twotone(servers): @@ -1141,9 +1141,9 @@ def test_ksr_twotone(servers): lifetime = timedelta(days=31 * 5) check_keys(zsks_altalg, lifetime, alg, size, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_kskroll(servers): @@ -1215,6 +1215,6 @@ def test_ksr_kskroll(servers): # - check keys check_keys(zsks, None, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)