2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-30 05:57:52 +00:00

Update kasp check_signatures for dnssec-policy

The check_signatures code was initially created to be suitable for
the ksr system test, to test the Offline KSK feature. For that, a
key is expected to be signing if the current time is between
the timing metadata Active and Retired.

With dnssec-policy, the key timing metadata is indicative, the key
states determine the actual signing behavior.

Update the check_signatures function so that by default the signing
is derived from the key states (ksigning and zsigning). Add an
argument 'offline_ksk', if set the make sure that the zsigning is set
if the current time is between the Active and Retired timing metadata,
and for ksigning we just use the timing metadata (as the key is offline,
we cannot check the key states).

Another (upcoming) test case is where key files are missing. When the
ZSK private key file is missing, the KSK takes over. Add an argument
'zsk_missing', when set to True the expected zone signing (zsigning)
is reversed.
This commit is contained in:
Matthijs Mekking 2025-03-17 11:52:18 +01:00
parent 43ded45ae9
commit fddf9f778b
2 changed files with 100 additions and 38 deletions

View File

@ -332,6 +332,52 @@ class Key:
)
return value
def get_signing_state(self, offline_ksk=False, zsk_missing=False) -> (bool, bool):
"""
This returns the signing state derived from the key states, KRRSIGState
and ZRRSIGState.
If 'offline_ksk' is set to True, we determine the signing state from
the timing metadata. If 'zsigning' is True, ensure the current time is
between the Active and Retired timing metadata.
If 'zsk_missing' is set to True, it means the ZSK private key file is
missing, and the KSK should take over signing the RRset, and the
expected zone signing state (zsigning) is reversed.
"""
# Fetch key timing metadata.
now = KeyTimingMetadata.now()
activate = self.get_timing("Activate")
inactive = self.get_timing("Inactive", must_exist=False)
active = now >= activate
retired = inactive is not None and inactive <= now
signing = active and not retired
# Fetch key state metadata.
krrsigstate = self.get_metadata("KRRSIGState", must_exist=False)
ksigning = krrsigstate in ["rumoured", "omnipresent"]
zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False)
zsigning = zrrsigstate in ["rumoured", "omnipresent"]
if ksigning:
assert self.is_ksk()
if zsigning:
assert self.is_zsk()
# If the ZSK private key file is missing, revers the zone signing state.
if zsk_missing:
zsigning = not zsigning
# If testing offline KSK, retrieve the signing state from the key timing
# metadata.
if offline_ksk and signing and self.is_zsk():
assert zsigning
if offline_ksk and signing and self.is_ksk():
ksigning = signing
return ksigning, zsigning
def ttl(self) -> int:
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
@ -772,8 +818,9 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None):
assert f"key: {key.tag}" in response
def _check_signatures(signatures, covers, fqdn, keys):
now = KeyTimingMetadata.now()
def _check_signatures(
signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False
):
numsigs = 0
zrrsig = True
if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]:
@ -781,23 +828,16 @@ def _check_signatures(signatures, covers, fqdn, keys):
krrsig = not zrrsig
for key in keys:
activate = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
ksigning, zsigning = key.get_signing_state(
offline_ksk=offline_ksk, zsk_missing=zsk_missing
)
active = now >= activate
retired = inactive is not None and inactive <= now
signing = active and not retired
alg = key.get_metadata("Algorithm")
rtype = dns.rdatatype.to_text(covers)
expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}"
if not signing:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
continue
if zrrsig and key.is_zsk():
if zrrsig and zsigning:
has_rrsig = False
for rrsig in signatures:
if re.search(expect, rrsig) is not None:
@ -806,11 +846,11 @@ def _check_signatures(signatures, covers, fqdn, keys):
assert has_rrsig, f"Expected signature but not found: {expect}"
numsigs += 1
if zrrsig and not key.is_zsk():
if zrrsig and not zsigning:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
if krrsig and key.is_ksk():
if krrsig and ksigning:
has_rrsig = False
for rrsig in signatures:
if re.search(expect, rrsig) is not None:
@ -819,14 +859,16 @@ def _check_signatures(signatures, covers, fqdn, keys):
assert has_rrsig, f"Expected signature but not found: {expect}"
numsigs += 1
if krrsig and not key.is_ksk():
if krrsig and not ksigning:
for rrsig in signatures:
assert re.search(expect, rrsig) is None
return numsigs
def check_signatures(rrset, covers, fqdn, ksks, zsks):
def check_signatures(
rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False
):
# Check if signatures with covering type are signed with the right keys.
# The right keys are the ones that expect a signature and have the
# correct role.
@ -840,8 +882,12 @@ def check_signatures(rrset, covers, fqdn, ksks, zsks):
rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
signatures.append(rrsig)
numsigs += _check_signatures(signatures, covers, fqdn, ksks)
numsigs += _check_signatures(signatures, covers, fqdn, zsks)
numsigs += _check_signatures(
signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
)
numsigs += _check_signatures(
signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing
)
assert numsigs == len(signatures)
@ -965,7 +1011,9 @@ def _query_rrset(server, fqdn, qtype, tsig=None):
return rrs, rrsigs
def check_apex(server, zone, ksks, zsks, tsig=None):
def check_apex(
server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None
):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
@ -973,30 +1021,44 @@ def check_apex(server, zone, ksks, zsks, tsig=None):
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
check_dnskeys(dnskeys, ksks, zsks)
check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
check_signatures(
rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
check_signatures(
rrsigs,
dns.rdatatype.SOA,
fqdn,
ksks,
zsks,
offline_ksk=offline_ksk,
zsk_missing=zsk_missing,
)
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
check_signatures(
rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
check_signatures(
rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk
)
def check_subdomain(server, zone, ksks, zsks, tsig=None):
def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
@ -1014,7 +1076,7 @@ def check_subdomain(server, zone, ksks, zsks, tsig=None):
else:
assert match in rrset.to_text()
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk)
def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None):

View File

@ -673,9 +673,9 @@ def test_ksr_common(servers):
# - check keys
check_keys(overlapping_zsks, lifetime, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks)
isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks)
isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
def test_ksr_lastbundle(servers):
@ -748,9 +748,9 @@ def test_ksr_lastbundle(servers):
# - check keys
check_keys(zsks, lifetime, offset=offset, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
# check that last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
@ -828,9 +828,9 @@ def test_ksr_inthemiddle(servers):
# - check keys
check_keys(zsks, lifetime, offset=offset, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
# check that no last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
@ -1023,9 +1023,9 @@ def test_ksr_unlimited(servers):
# - check keys
check_keys(zsks, lifetime, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
def test_ksr_twotone(servers):
@ -1141,9 +1141,9 @@ def test_ksr_twotone(servers):
lifetime = timedelta(days=31 * 5)
check_keys(zsks_altalg, lifetime, alg, size, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
def test_ksr_kskroll(servers):
@ -1215,6 +1215,6 @@ def test_ksr_kskroll(servers):
# - check keys
check_keys(zsks, None, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)