From fddf9f778b49f454e68cbefea8c897ac3bd0ea44 Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Mon, 17 Mar 2025 11:52:18 +0100 Subject: [PATCH] Update kasp check_signatures for dnssec-policy The check_signatures code was initially created to be suitable for the ksr system test, to test the Offline KSK feature. For that, a key is expected to be signing if the current time is between the timing metadata Active and Retired. With dnssec-policy, the key timing metadata is indicative, the key states determine the actual signing behavior. Update the check_signatures function so that by default the signing is derived from the key states (ksigning and zsigning). Add an argument 'offline_ksk', if set the make sure that the zsigning is set if the current time is between the Active and Retired timing metadata, and for ksigning we just use the timing metadata (as the key is offline, we cannot check the key states). Another (upcoming) test case is where key files are missing. When the ZSK private key file is missing, the KSK takes over. Add an argument 'zsk_missing', when set to True the expected zone signing (zsigning) is reversed. --- bin/tests/system/isctest/kasp.py | 114 +++++++++++++++++++++++------- bin/tests/system/ksr/tests_ksr.py | 24 +++---- 2 files changed, 100 insertions(+), 38 deletions(-) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 0ea1c774ea..5b39b9fffd 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -332,6 +332,52 @@ class Key: ) return value + def get_signing_state(self, offline_ksk=False, zsk_missing=False) -> (bool, bool): + """ + This returns the signing state derived from the key states, KRRSIGState + and ZRRSIGState. + + If 'offline_ksk' is set to True, we determine the signing state from + the timing metadata. If 'zsigning' is True, ensure the current time is + between the Active and Retired timing metadata. + + If 'zsk_missing' is set to True, it means the ZSK private key file is + missing, and the KSK should take over signing the RRset, and the + expected zone signing state (zsigning) is reversed. + """ + # Fetch key timing metadata. + now = KeyTimingMetadata.now() + activate = self.get_timing("Activate") + inactive = self.get_timing("Inactive", must_exist=False) + + active = now >= activate + retired = inactive is not None and inactive <= now + signing = active and not retired + + # Fetch key state metadata. + krrsigstate = self.get_metadata("KRRSIGState", must_exist=False) + ksigning = krrsigstate in ["rumoured", "omnipresent"] + zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False) + zsigning = zrrsigstate in ["rumoured", "omnipresent"] + + if ksigning: + assert self.is_ksk() + if zsigning: + assert self.is_zsk() + + # If the ZSK private key file is missing, revers the zone signing state. + if zsk_missing: + zsigning = not zsigning + + # If testing offline KSK, retrieve the signing state from the key timing + # metadata. + if offline_ksk and signing and self.is_zsk(): + assert zsigning + if offline_ksk and signing and self.is_ksk(): + ksigning = signing + + return ksigning, zsigning + def ttl(self) -> int: with open(self.keyfile, "r", encoding="utf-8") as file: for line in file: @@ -772,8 +818,9 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None): assert f"key: {key.tag}" in response -def _check_signatures(signatures, covers, fqdn, keys): - now = KeyTimingMetadata.now() +def _check_signatures( + signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False +): numsigs = 0 zrrsig = True if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]: @@ -781,23 +828,16 @@ def _check_signatures(signatures, covers, fqdn, keys): krrsig = not zrrsig for key in keys: - activate = key.get_timing("Activate") - inactive = key.get_timing("Inactive", must_exist=False) + ksigning, zsigning = key.get_signing_state( + offline_ksk=offline_ksk, zsk_missing=zsk_missing + ) - active = now >= activate - retired = inactive is not None and inactive <= now - signing = active and not retired alg = key.get_metadata("Algorithm") rtype = dns.rdatatype.to_text(covers) expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}" - if not signing: - for rrsig in signatures: - assert re.search(expect, rrsig) is None - continue - - if zrrsig and key.is_zsk(): + if zrrsig and zsigning: has_rrsig = False for rrsig in signatures: if re.search(expect, rrsig) is not None: @@ -806,11 +846,11 @@ def _check_signatures(signatures, covers, fqdn, keys): assert has_rrsig, f"Expected signature but not found: {expect}" numsigs += 1 - if zrrsig and not key.is_zsk(): + if zrrsig and not zsigning: for rrsig in signatures: assert re.search(expect, rrsig) is None - if krrsig and key.is_ksk(): + if krrsig and ksigning: has_rrsig = False for rrsig in signatures: if re.search(expect, rrsig) is not None: @@ -819,14 +859,16 @@ def _check_signatures(signatures, covers, fqdn, keys): assert has_rrsig, f"Expected signature but not found: {expect}" numsigs += 1 - if krrsig and not key.is_ksk(): + if krrsig and not ksigning: for rrsig in signatures: assert re.search(expect, rrsig) is None return numsigs -def check_signatures(rrset, covers, fqdn, ksks, zsks): +def check_signatures( + rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False +): # Check if signatures with covering type are signed with the right keys. # The right keys are the ones that expect a signature and have the # correct role. @@ -840,8 +882,12 @@ def check_signatures(rrset, covers, fqdn, ksks, zsks): rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" signatures.append(rrsig) - numsigs += _check_signatures(signatures, covers, fqdn, ksks) - numsigs += _check_signatures(signatures, covers, fqdn, zsks) + numsigs += _check_signatures( + signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing + ) + numsigs += _check_signatures( + signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing + ) assert numsigs == len(signatures) @@ -965,7 +1011,9 @@ def _query_rrset(server, fqdn, qtype, tsig=None): return rrs, rrsigs -def check_apex(server, zone, ksks, zsks, tsig=None): +def check_apex( + server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None +): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." @@ -973,30 +1021,44 @@ def check_apex(server, zone, ksks, zsks, tsig=None): # test dnskey query dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) check_dnskeys(dnskeys, ksks, zsks) - check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks) + check_signatures( + rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk + ) # test soa query soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() - check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks) + check_signatures( + rrsigs, + dns.rdatatype.SOA, + fqdn, + ksks, + zsks, + offline_ksk=offline_ksk, + zsk_missing=zsk_missing, + ) # test cdnskey query cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig) check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) if len(cdnskeys) > 0: assert len(rrsigs) > 0 - check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks) + check_signatures( + rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk + ) # test cds query cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig) check_cds(cds, ksks) if len(cds) > 0: assert len(rrsigs) > 0 - check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks) + check_signatures( + rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk + ) -def check_subdomain(server, zone, ksks, zsks, tsig=None): +def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." @@ -1014,7 +1076,7 @@ def check_subdomain(server, zone, ksks, zsks, tsig=None): else: assert match in rrset.to_text() - check_signatures(rrsigs, qtype, fqdn, ksks, zsks) + check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk) def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None): diff --git a/bin/tests/system/ksr/tests_ksr.py b/bin/tests/system/ksr/tests_ksr.py index 5512f34fa2..ae020086f9 100644 --- a/bin/tests/system/ksr/tests_ksr.py +++ b/bin/tests/system/ksr/tests_ksr.py @@ -673,9 +673,9 @@ def test_ksr_common(servers): # - check keys check_keys(overlapping_zsks, lifetime, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks) + isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True) def test_ksr_lastbundle(servers): @@ -748,9 +748,9 @@ def test_ksr_lastbundle(servers): # - check keys check_keys(zsks, lifetime, offset=offset, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) # check that last bundle warning is logged warning = "last bundle in skr, please import new skr file" @@ -828,9 +828,9 @@ def test_ksr_inthemiddle(servers): # - check keys check_keys(zsks, lifetime, offset=offset, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) # check that no last bundle warning is logged warning = "last bundle in skr, please import new skr file" @@ -1023,9 +1023,9 @@ def test_ksr_unlimited(servers): # - check keys check_keys(zsks, lifetime, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_twotone(servers): @@ -1141,9 +1141,9 @@ def test_ksr_twotone(servers): lifetime = timedelta(days=31 * 5) check_keys(zsks_altalg, lifetime, alg, size, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_kskroll(servers): @@ -1215,6 +1215,6 @@ def test_ksr_kskroll(servers): # - check keys check_keys(zsks, None, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks) + isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain - isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)