2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 06:25:31 +00:00

Move shared test code into isctest.kasp module

Move key calculations and rollover step checks into the shared
isctest.kasp module. Deduplicate the key interval calculations.
This commit is contained in:
Nicki Křížek
2025-06-02 17:20:06 +02:00
parent 784a252425
commit b410710354
2 changed files with 144 additions and 174 deletions

View File

@@ -49,6 +49,55 @@ def _query(server, qname, qtype, tsig=None):
return response
def Ipub(config):
return (
config["dnskey-ttl"]
+ config["zone-propagation-delay"]
+ config["publish-safety"]
)
def IpubC(config, rollover=True):
ttl1 = config["dnskey-ttl"] + config["publish-safety"]
ttl2 = timedelta(0)
if not rollover:
# If this is the first key, we also need to wait until the zone
# signatures are omnipresent. Use max-zone-ttl instead of
# dnskey-ttl, and no publish-safety (because we are looking at
# signatures here, not the public key).
ttl2 = config["max-zone-ttl"]
return config["zone-propagation-delay"] + max(ttl1, ttl2)
def Iret(config, zsk=True, ksk=False, rollover=True):
sign_delay = timedelta(0)
safety_interval = timedelta(0)
if rollover:
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
safety_interval = config["retire-safety"]
iretKSK = timedelta(0)
if ksk:
# KSK: Double-KSK Method: Iret = DprpP + TTLds
iretKSK = (
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
)
iretZSK = timedelta(0)
if zsk:
# ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig
iretZSK = (
sign_delay
+ config["zone-propagation-delay"]
+ config["max-zone-ttl"]
+ safety_interval
)
return max(iretKSK, iretZSK)
@total_ordering
class KeyTimingMetadata:
"""
@@ -174,12 +223,7 @@ class KeyProperties:
ipub = timedelta(0)
if self.key.get_metadata("Predecessor", must_exist=False) != "undefined":
# Ipub = Dprp + TTLkey
ipub = (
config["dnskey-ttl"]
+ config["zone-propagation-delay"]
+ config["publish-safety"]
)
ipub = Ipub(config)
self.timing["Active"] = self.timing["Published"] + ipub
@@ -187,18 +231,8 @@ class KeyProperties:
if not self.key.is_ksk():
return
ttl1 = config["dnskey-ttl"] + config["publish-safety"]
ttl2 = timedelta(0)
if self.key.get_metadata("Predecessor", must_exist=False) == "undefined":
# If this is the first key, we also need to wait until the zone
# signatures are omnipresent. Use max-zone-ttl instead of
# dnskey-ttl, and no publish-safety (because we are looking at
# signatures here, not the public key).
ttl2 = config["max-zone-ttl"]
# IpubC = DprpC + TTLkey
ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2)
rollover = self.key.get_metadata("Predecessor", must_exist=False) != "undefined"
ipubc = IpubC(config, rollover)
self.timing["PublishCDS"] = self.timing["Published"] + ipubc
@@ -211,26 +245,8 @@ class KeyProperties:
if self.metadata["Lifetime"] == 0:
return
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
safety_interval = config["retire-safety"]
iretKSK = timedelta(0)
iretZSK = timedelta(0)
if self.key.is_ksk():
# Iret = DprpP + TTLds
iretKSK = (
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
)
if self.key.is_zsk():
# Iret = Dsgn + Dprp + TTLsig
iretZSK = (
sign_delay
+ config["zone-propagation-delay"]
+ config["max-zone-ttl"]
+ safety_interval
)
self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK)
iret = Iret(config, zsk=self.key.is_zsk(), ksk=self.key.is_ksk())
self.timing["Removed"] = self.timing["Retired"] + iret
def set_expected_keytimes(self, config, offset=None, pregenerated=False):
if self.key is None:
@@ -1149,6 +1165,88 @@ def check_subdomain(
)
def check_rollover_step(server, config, policy, step):
zone = step["zone"]
keyprops = step["keyprops"]
nextev = step["nextev"]
cdss = step.get("cdss", None)
keyrelationships = step.get("keyrelationships", None)
smooth = step.get("smooth", False)
ds_swap = step.get("ds-swap", True)
cds_delete = step.get("cds-delete", False)
check_keytimes_flag = step.get("check-keytimes", True)
zone_signed = step.get("zone-signed", True)
isctest.log.info(f"check rollover step {zone}")
if zone_signed:
check_dnssec_verify(server, zone)
ttl = int(config["dnskey-ttl"].total_seconds())
expected = policy_to_properties(ttl, keyprops)
keys = keydir_to_keylist(zone, server.identifier)
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
check_keys(zone, keys, expected)
for kp in expected:
key = kp.key
# Set expected key timing metadata.
kp.set_expected_keytimes(config)
# Set rollover relationships.
if keyrelationships is not None:
prd = keyrelationships[0]
suc = keyrelationships[1]
expected[prd].metadata["Successor"] = expected[suc].key.tag
expected[suc].metadata["Predecessor"] = expected[prd].key.tag
check_keyrelationships(keys, expected)
# Policy changes may retire keys, set expected timing metadata.
if kp.metadata["GoalState"] == "hidden" and "Retired" not in kp.timing:
retired = kp.key.get_timing("Inactive")
kp.timing["Retired"] = retired
kp.timing["Removed"] = retired + Iret(
config, zsk=key.is_zsk(), ksk=key.is_ksk()
)
# Check that CDS publication/withdrawal is logged.
if "KSK" not in kp.metadata:
continue
if kp.metadata["KSK"] == "no":
continue
if ds_swap and kp.metadata["DSState"] == "rumoured":
assert cdss is not None
for algstr in ["CDNSKEY", "CDS (SHA-256)", "CDS (SHA-384)"]:
if algstr in cdss:
check_cdslog(server, zone, key, algstr)
else:
check_cdslog_prohibit(server, zone, key, algstr)
# The DS can be introduced. We ignore any parent registration delay,
# so set the DS publish time to now.
server.rndc(f"dnssec -checkds -key {key.tag} published {zone}")
if ds_swap and kp.metadata["DSState"] == "unretentive":
# The DS can be withdrawn. We ignore any parent registration
# delay, so set the DS withdraw time to now.
server.rndc(f"dnssec -checkds -key {key.tag} withdrawn {zone}")
if check_keytimes_flag:
check_keytimes(keys, expected)
check_dnssecstatus(server, zone, keys, policy=policy)
check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete)
check_subdomain(server, zone, ksks, zsks, smooth=smooth)
def check_next_key_event():
return next_key_event_equals(server, zone, nextev)
isctest.run.retry_with_timeout(check_next_key_event, timeout=5)
def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None):
"""
Test an RRset below the apex and verify it is updated and signed correctly.

View File

@@ -19,7 +19,7 @@ pytest.importorskip("dns", minversion="2.0.0")
import dns.update
import isctest
from isctest.kasp import KeyTimingMetadata
from isctest.kasp import KeyTimingMetadata, Ipub, IpubC, Iret
pytestmark = pytest.mark.extra_artifacts(
[
@@ -46,52 +46,6 @@ pytestmark = pytest.mark.extra_artifacts(
)
def Ipub(config):
return (
config["dnskey-ttl"]
+ config["zone-propagation-delay"]
+ config["publish-safety"]
)
def IpubC(config, rollover=True):
if rollover:
ttl = config["dnskey-ttl"]
safety_interval = config["publish-safety"]
else:
ttl = config["max-zone-ttl"]
safety_interval = timedelta(0)
return ttl + config["zone-propagation-delay"] + safety_interval
def Iret(config, zsk=True, ksk=False, rollover=True):
sign_delay = timedelta(0)
safety_interval = timedelta(0)
if rollover:
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
safety_interval = config["retire-safety"]
iretKSK = timedelta(0)
if ksk:
# KSK: Double-KSK Method: Iret = DprpP + TTLds
iretKSK = (
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
)
iretZSK = timedelta(0)
if zsk:
# ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig
iretZSK = (
sign_delay
+ config["zone-propagation-delay"]
+ config["max-zone-ttl"]
+ safety_interval
)
return max(iretKSK, iretZSK)
def test_rollover_manual(servers):
server = servers["ns3"]
policy = "manual-rollover"
@@ -378,88 +332,6 @@ def test_rollover_multisigner(servers):
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
def check_rollover_step(server, config, policy, step):
zone = step["zone"]
keyprops = step["keyprops"]
nextev = step["nextev"]
cdss = step.get("cdss", None)
keyrelationships = step.get("keyrelationships", None)
smooth = step.get("smooth", False)
ds_swap = step.get("ds-swap", True)
cds_delete = step.get("cds-delete", False)
check_keytimes = step.get("check-keytimes", True)
zone_signed = step.get("zone-signed", True)
isctest.log.info(f"check rollover step {zone}")
if zone_signed:
isctest.kasp.check_dnssec_verify(server, zone)
ttl = int(config["dnskey-ttl"].total_seconds())
expected = isctest.kasp.policy_to_properties(ttl, keyprops)
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
isctest.kasp.check_keys(zone, keys, expected)
for kp in expected:
key = kp.key
# Set expected key timing metadata.
kp.set_expected_keytimes(config)
# Set rollover relationships.
if keyrelationships is not None:
prd = keyrelationships[0]
suc = keyrelationships[1]
expected[prd].metadata["Successor"] = expected[suc].key.tag
expected[suc].metadata["Predecessor"] = expected[prd].key.tag
isctest.kasp.check_keyrelationships(keys, expected)
# Policy changes may retire keys, set expected timing metadata.
if kp.metadata["GoalState"] == "hidden" and "Retired" not in kp.timing:
retired = kp.key.get_timing("Inactive")
kp.timing["Retired"] = retired
kp.timing["Removed"] = retired + Iret(
config, zsk=key.is_zsk(), ksk=key.is_ksk()
)
# Check that CDS publication/withdrawal is logged.
if "KSK" not in kp.metadata:
continue
if kp.metadata["KSK"] == "no":
continue
if ds_swap and kp.metadata["DSState"] == "rumoured":
assert cdss is not None
for algstr in ["CDNSKEY", "CDS (SHA-256)", "CDS (SHA-384)"]:
if algstr in cdss:
isctest.kasp.check_cdslog(server, zone, key, algstr)
else:
isctest.kasp.check_cdslog_prohibit(server, zone, key, algstr)
# The DS can be introduced. We ignore any parent registration delay,
# so set the DS publish time to now.
server.rndc(f"dnssec -checkds -key {key.tag} published {zone}")
if ds_swap and kp.metadata["DSState"] == "unretentive":
# The DS can be withdrawn. We ignore any parent registration
# delay, so set the DS withdraw time to now.
server.rndc(f"dnssec -checkds -key {key.tag} withdrawn {zone}")
if check_keytimes:
isctest.kasp.check_keytimes(keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
isctest.kasp.check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete)
isctest.kasp.check_subdomain(server, zone, ksks, zsks, smooth=smooth)
def check_next_key_event():
return isctest.kasp.next_key_event_equals(server, zone, nextev)
isctest.run.retry_with_timeout(check_next_key_event, timeout=5)
def test_rollover_enable_dnssec(servers):
server = servers["ns3"]
policy = "enable-dnssec"
@@ -546,7 +418,7 @@ def test_rollover_enable_dnssec(servers):
]
for step in steps:
check_rollover_step(server, config, policy, step)
isctest.kasp.check_rollover_step(server, config, policy, step)
def test_rollover_zsk_prepublication(servers):
@@ -685,7 +557,7 @@ def test_rollover_zsk_prepublication(servers):
]
for step in steps:
check_rollover_step(server, config, policy, step)
isctest.kasp.check_rollover_step(server, config, policy, step)
def test_rollover_ksk_doubleksk(servers):
@@ -835,7 +707,7 @@ def test_rollover_ksk_doubleksk(servers):
]
for step in steps:
check_rollover_step(server, config, policy, step)
isctest.kasp.check_rollover_step(server, config, policy, step)
# Test #2375: Scheduled rollovers are happening faster than they can finish.
zone = "three-is-a-crowd.kasp"
@@ -1088,7 +960,7 @@ def test_rollover_csk_roll1(servers):
]
for step in steps:
check_rollover_step(server, config, policy, step)
isctest.kasp.check_rollover_step(server, config, policy, step)
def test_rollover_csk_roll2(servers):
@@ -1271,7 +1143,7 @@ def test_rollover_csk_roll2(servers):
]
for step in steps:
check_rollover_step(server, config, policy, step)
isctest.kasp.check_rollover_step(server, config, policy, step)
def test_rollover_policy_changes(servers, templates):
@@ -1440,7 +1312,7 @@ def test_rollover_policy_changes(servers, templates):
steps.append(step)
for step in steps:
check_rollover_step(server, step["config"], step["policy"], step)
isctest.kasp.check_rollover_step(server, step["config"], step["policy"], step)
# Reconfigure, changing DNSSEC policies and other configuration options,
# triggering algorithm rollovers and other dnssec-policy changes.
@@ -1811,4 +1683,4 @@ def test_rollover_policy_changes(servers, templates):
steps = steps + algo_steps
for step in steps:
check_rollover_step(server, step["config"], step["policy"], step)
isctest.kasp.check_rollover_step(server, step["config"], step["policy"], step)