diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 3419ffefec..03dedd9aa9 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -829,7 +829,13 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None): def _check_signatures( - signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False, smooth=False + signatures, + covers, + fqdn, + keys, + offline_ksk=False, + zsk_missing=False, + smooth=False, ): numsigs = 0 zrrsig = True @@ -917,7 +923,7 @@ def check_signatures( assert numsigs == len(signatures) -def _check_dnskeys(dnskeys, keys, cdnskey=False): +def _check_dnskeys(dnskeys, keys, cdnskey=False, manual_mode=False): now = KeyTimingMetadata.now() numkeys = 0 @@ -928,10 +934,21 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False): delete_md = f"Sync{delete_md}" for key in keys: - publish = key.get_timing(publish_md, must_exist=False) - delete = key.get_timing(delete_md, must_exist=False) - published = publish is not None and now >= publish - removed = delete is not None and delete <= now + if manual_mode: + # State transitions may be blocked, preset key timings may not + # be accurate. Use the state values to determine whether the + # CDS must be published or not. + if cdnskey: + md = key.get_metadata("DSState") + else: + md = key.get_metadata("DNSKEYState") + published = md in ["omnipresent", "rumoured"] + removed = not published + else: + publish = key.get_timing(publish_md, must_exist=False) + delete = key.get_timing(delete_md, must_exist=False) + published = publish is not None and now >= publish + removed = delete is not None and delete <= now if not published or removed: for dnskey in dnskeys: @@ -954,7 +971,7 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False): return numkeys -def check_dnskeys(rrset, ksks, zsks, cdnskey=False): +def check_dnskeys(rrset, ksks, zsks, cdnskey=False, manual_mode=False): # Check if the correct DNSKEY records are published. If the current time # is between the timing metadata 'publish' and 'delete', the key must have # a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY @@ -969,14 +986,14 @@ def check_dnskeys(rrset, ksks, zsks, cdnskey=False): dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" dnskeys.append(dnskey) - numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey) + numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey, manual_mode=manual_mode) if not cdnskey: - numkeys += _check_dnskeys(dnskeys, zsks) + numkeys += _check_dnskeys(dnskeys, zsks, manual_mode=manual_mode) assert numkeys == len(dnskeys) -def check_cds(cdss, keys, alg): +def check_cds(cdss, keys, alg, manual_mode=False): # Check if the correct CDS records are published. If the current time # is between the timing metadata 'publish' and 'delete', the key must have # a CDS record published. @@ -986,10 +1003,19 @@ def check_cds(cdss, keys, alg): for key in keys: assert key.is_ksk() - publish = key.get_timing("SyncPublish") - delete = key.get_timing("SyncDelete", must_exist=False) - published = now >= publish - removed = delete is not None and delete <= now + if manual_mode: + # State transitions may be blocked, preset key timings may not + # be accurate. Use the state values to determine whether the + # CDS must be published or not. + md = key.get_metadata("DSState") + published = md in ["omnipresent", "rumoured"] + removed = not published + else: + publish = key.get_timing("SyncPublish") + delete = key.get_timing("SyncDelete", must_exist=False) + published = now >= publish + removed = delete is not None and delete <= now + if not published or removed: for cds in cdss: assert not key.cds_equals(cds, alg) @@ -1069,6 +1095,7 @@ def check_apex( zsks, cdss=None, cds_delete=False, + manual_mode=False, offline_ksk=False, zsk_missing=False, tsig=None, @@ -1082,7 +1109,7 @@ def check_apex( # test dnskey query dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) - check_dnskeys(dnskeys, ksks, zsks) + check_dnskeys(dnskeys, ksks, zsks, manual_mode=manual_mode) check_signatures( rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk ) @@ -1108,7 +1135,7 @@ def check_apex( check_cdsdelete(cdnskeys, "0 3 0 AA==") else: if "CDNSKEY" in cdss: - check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) + check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True, manual_mode=manual_mode) else: assert len(cdnskeys) == 0 @@ -1136,7 +1163,7 @@ def check_apex( for alg in ["SHA-256", "SHA-384"]: if f"CDS ({alg})" in cdss: - numcds += check_cds(cdsrrs, ksks, alg) + numcds += check_cds(cdsrrs, ksks, alg, manual_mode=manual_mode) else: check_cds_prohibit(cdsrrs, ksks, alg) @@ -1185,6 +1212,7 @@ def check_rollover_step(server, config, policy, step): cds_delete = step.get("cds-delete", False) check_keytimes_flag = step.get("check-keytimes", True) zone_signed = step.get("zone-signed", True) + manual_mode = step.get("manual-mode", False) isctest.log.info(f"check rollover step {zone}") @@ -1247,7 +1275,15 @@ def check_rollover_step(server, config, policy, step): check_keytimes(keys, expected) check_dnssecstatus(server, zone, keys, policy=policy) - check_apex(server, zone, ksks, zsks, cdss=cdss, cds_delete=cds_delete) + check_apex( + server, + zone, + ksks, + zsks, + cdss=cdss, + cds_delete=cds_delete, + manual_mode=manual_mode, + ) check_subdomain(server, zone, ksks, zsks, smooth=smooth) def check_next_key_event():