diff --git a/bin/tests/system/isctest/check.py b/bin/tests/system/isctest/check.py index 0906ad92ff..b35dfe848e 100644 --- a/bin/tests/system/isctest/check.py +++ b/bin/tests/system/isctest/check.py @@ -9,6 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +import difflib import shutil from typing import Optional @@ -128,3 +129,24 @@ def is_response_to(response: dns.message.Message, query: dns.message.Message) -> single_question(response) single_question(query) assert query.is_response(response), str(response) + + +def file_contents_equal(file1, file2): + def normalize_line(line): + # remove trailing&leading whitespace and replace multiple whitespaces + return " ".join(line.split()) + + def read_lines(file_path): + with open(file_path, "r", encoding="utf-8") as file: + return [normalize_line(line) for line in file.readlines()] + + lines1 = read_lines(file1) + lines2 = read_lines(file2) + + differ = difflib.Differ() + diff = differ.compare(lines1, lines2) + + for line in diff: + assert not line.startswith("+ ") and not line.startswith( + "- " + ), f'file contents of "{file1}" and "{file2}" differ' diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 1fbb489319..1b82baf416 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -10,24 +10,34 @@ # information regarding copyright ownership. from functools import total_ordering +import glob import os from pathlib import Path import re import subprocess import time -from typing import Optional, Union +from typing import Dict, List, Optional, Union from datetime import datetime, timedelta, timezone import dns +import dns.tsig import isctest.log import isctest.query DEFAULT_TTL = 300 +NEXT_KEY_EVENT_THRESHOLD = 100 -def _query(server, qname, qtype): + +def _query(server, qname, qtype, tsig=None): query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) + + if tsig is not None: + tsigkey = tsig.split(":") + keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0]) + query.use_tsig(keyring) + try: response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3) except dns.exception.Timeout: @@ -100,6 +110,165 @@ class KeyTimingMetadata: return result +class KeyProperties: + """ + Represent the (expected) properties a key should have. + """ + + def __init__( + self, + name: str, + properties: dict, + metadata: dict, + timing: Dict[str, KeyTimingMetadata], + ): + self.name = name + self.key = None + self.properties = properties + self.metadata = metadata + self.timing = timing + + def __repr__(self): + return self.name + + def __str__(self) -> str: + return self.name + + @staticmethod + def default(with_state=True) -> "KeyProperties": + properties = { + "expect": True, + "private": True, + "legacy": False, + "role": "csk", + "role_full": "key-signing", + "dnskey_ttl": 3600, + "flags": 257, + } + metadata = { + "Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number, + "Length": 256, + "Lifetime": 0, + "KSK": "yes", + "ZSK": "yes", + } + timing: Dict[str, KeyTimingMetadata] = {} + + result = KeyProperties( + name="DEFAULT", properties=properties, metadata=metadata, timing=timing + ) + result.name = "DEFAULT" + result.key = None + if with_state: + result.metadata["GoalState"] = "omnipresent" + result.metadata["DNSKEYState"] = "rumoured" + result.metadata["KRRSIGState"] = "rumoured" + result.metadata["ZRRSIGState"] = "rumoured" + result.metadata["DSState"] = "hidden" + + return result + + def Ipub(self, config): + ipub = timedelta(0) + + if self.key.get_metadata("Predecessor", must_exist=False) != "undefined": + # Ipub = Dprp + TTLkey + ipub = ( + config["dnskey-ttl"] + + config["zone-propagation-delay"] + + config["publish-safety"] + ) + + self.timing["Active"] = self.timing["Published"] + ipub + + def IpubC(self, config): + if not self.key.is_ksk(): + return + + ttl1 = config["dnskey-ttl"] + config["publish-safety"] + ttl2 = timedelta(0) + + if self.key.get_metadata("Predecessor", must_exist=False) == "undefined": + # If this is the first key, we also need to wait until the zone + # signatures are omnipresent. Use max-zone-ttl instead of + # dnskey-ttl, and no publish-safety (because we are looking at + # signatures here, not the public key). + ttl2 = config["max-zone-ttl"] + + # IpubC = DprpC + TTLkey + ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2) + + self.timing["PublishCDS"] = self.timing["Published"] + ipubc + + if self.metadata["Lifetime"] != 0: + self.timing["DeleteCDS"] = ( + self.timing["PublishCDS"] + self.metadata["Lifetime"] + ) + + def Iret(self, config): + if self.metadata["Lifetime"] == 0: + return + + sign_delay = config["signatures-validity"] - config["signatures-refresh"] + safety_interval = config["retire-safety"] + + iretKSK = timedelta(0) + iretZSK = timedelta(0) + if self.key.is_ksk(): + # Iret = DprpP + TTLds + iretKSK = ( + config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval + ) + if self.key.is_zsk(): + # Iret = Dsgn + Dprp + TTLsig + iretZSK = ( + sign_delay + + config["zone-propagation-delay"] + + config["max-zone-ttl"] + + safety_interval + ) + + self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK) + + def set_expected_keytimes(self, config, offset=None, pregenerated=False): + if self.key is None: + raise ValueError("KeyProperties must be attached to a Key") + + if self.properties["legacy"]: + return + + if offset is None: + offset = self.properties["offset"] + + self.timing["Generated"] = self.key.get_timing("Created") + + self.timing["Published"] = self.timing["Generated"] + if pregenerated: + self.timing["Published"] = self.key.get_timing("Publish") + self.timing["Published"] = self.timing["Published"] + offset + self.Ipub(config) + + # Set Retired timing metadata if key has lifetime. + if self.metadata["Lifetime"] != 0: + self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"] + + self.IpubC(config) + self.Iret(config) + + # Key state change times must exist, but since we cannot reliably tell + # when named made the actual state change, we don't care what the + # value is. Set it to None will verify that the metadata exists, but + # without actual checking the value. + self.timing["DNSKEYChange"] = None + + if self.key.is_ksk(): + self.timing["DSChange"] = None + self.timing["KRRSIGChange"] = None + + if self.key.is_zsk(): + self.timing["ZRRSIGChange"] = None + + @total_ordering class Key: """ @@ -117,6 +286,7 @@ class Key: else: self.keydir = Path(keydir) self.path = str(self.keydir / name) + self.privatefile = f"{self.path}.private" self.keyfile = f"{self.path}.key" self.statefile = f"{self.path}.state" self.tag = int(self.name[-5:]) @@ -139,21 +309,43 @@ class Key: ) return None - def get_metadata(self, metadata: str, must_exist=True) -> str: + def get_metadata( + self, metadata: str, file=None, comment=False, must_exist=True + ) -> str: + if file is None: + file = self.statefile value = "undefined" - regex = rf"{metadata}:\s+(.*)" - with open(self.statefile, "r", encoding="utf-8") as file: - for line in file: + regex = rf"{metadata}:\s+(\S+).*" + if comment: + # The expected metadata is prefixed with a ';'. + regex = rf";\s+{metadata}:\s+(\S+).*" + with open(file, "r", encoding="utf-8") as fp: + for line in fp: match = re.match(regex, line) if match is not None: value = match.group(1) break if must_exist and value == "undefined": raise ValueError( - 'state metadata "{metadata}" for key "{self.name}" undefined' + f'metadata "{metadata}" for key "{self.name}" in file "{file}" undefined' ) return value + def ttl(self) -> int: + with open(self.keyfile, "r", encoding="utf-8") as file: + for line in file: + if line.startswith(";"): + continue + return int(line.split()[1]) + return 0 + + def dnskey(self): + with open(self.keyfile, "r", encoding="utf-8") as file: + for line in file: + if "DNSKEY" in line: + return line.strip() + return "undefined" + def is_ksk(self) -> bool: return self.get_metadata("KSK") == "yes" @@ -187,7 +379,7 @@ class Key: dsfromkey_command = [ os.environ.get("DSFROMKEY"), "-T", - "3600", + str(self.ttl()), "-a", alg, "-C", @@ -216,6 +408,149 @@ class Key: return digest_fromfile == digest_fromwire + def is_metadata_consistent(self, key, metadata, checkval=True): + """ + If 'key' exists in 'metadata' then it must also exist in the state + meta data. Otherwise, it must not exist in the state meta data. + If 'checkval' is True, the meta data values must also match. + """ + if key in metadata: + if checkval: + value = self.get_metadata(key) + if value != f"{metadata[key]}": + isctest.log.debug( + f"{self.name} {key} METADATA MISMATCH: {value} - {metadata[key]}" + ) + return value == f"{metadata[key]}" + + return self.get_metadata(key) != "undefined" + + value = self.get_metadata(key, must_exist=False) + if value != "undefined": + isctest.log.debug(f"{self.name} {key} METADATA UNEXPECTED: {value}") + return value == "undefined" + + def is_timing_consistent(self, key, timing, file, comment=False): + """ + If 'key' exists in 'timing' then it must match the value in the state + timing data. Otherwise, it must also not exist in the state timing data. + """ + if key in timing: + value = self.get_metadata(key, file=file, comment=comment) + if value != str(timing[key]): + isctest.log.debug( + f"{self.name} {key} TIMING MISMATCH: {value} - {timing[key]}" + ) + return value == str(timing[key]) + + value = self.get_metadata(key, file=file, comment=comment, must_exist=False) + if value != "undefined": + isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}") + return value == "undefined" + + def match_properties(self, zone, properties): + """ + Check the key with given properties. + """ + if not properties.properties["expect"]: + return False + + # Check file existence. + # Noop. If file is missing then the get_metadata calls will fail. + + # Check the public key file. + role = properties.properties["role_full"] + comment = f"This is a {role} key, keyid {self.tag}, for {zone}." + if not isctest.util.file_contents_contain(self.keyfile, comment): + isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'") + return False + + ttl = properties.properties["dnskey_ttl"] + flags = properties.properties["flags"] + alg = properties.metadata["Algorithm"] + dnskey = f"{zone}. {ttl} IN DNSKEY {flags} 3 {alg}" + if not isctest.util.file_contents_contain(self.keyfile, dnskey): + isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'") + return False + + # Now check the private key file. + if properties.properties["private"]: + # Retrieve creation date. + created = self.get_metadata("Generated") + + pval = self.get_metadata("Created", file=self.privatefile) + if pval != created: + isctest.log.debug( + f"{self.name} Created METADATA MISMATCH: {pval} - {created}" + ) + return False + pval = self.get_metadata("Private-key-format", file=self.privatefile) + if pval != "v1.3": + isctest.log.debug( + f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3" + ) + return False + pval = self.get_metadata("Algorithm", file=self.privatefile) + if pval != f"{alg}": + isctest.log.debug( + f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}" + ) + return False + + # Now check the key state file. + if properties.properties["legacy"]: + return True + + comment = f"This is the state of key {self.tag}, for {zone}." + if not isctest.util.file_contents_contain(self.statefile, comment): + isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'") + return False + + attributes = [ + "Lifetime", + "Algorithm", + "Length", + "KSK", + "ZSK", + "GoalState", + "DNSKEYState", + "KRRSIGState", + "ZRRSIGState", + "DSState", + ] + for key in attributes: + if not self.is_metadata_consistent(key, properties.metadata): + return False + + # A match is found. + return True + + def match_timingmetadata(self, timings, file=None, comment=False): + if file is None: + file = self.statefile + + attributes = [ + "Generated", + "Created", + "Published", + "Publish", + "PublishCDS", + "SyncPublish", + "Active", + "Activate", + "Retired", + "Inactive", + "Revoked", + "Removed", + "Delete", + ] + for key in attributes: + if not self.is_timing_consistent(key, timings, file, comment=comment): + isctest.log.debug(f"{self.name} TIMING METADATA MISMATCH: {key}") + return False + + return True + def __lt__(self, other: "Key"): return self.name < other.name @@ -226,14 +561,14 @@ class Key: return self.path -def check_zone_is_signed(server, zone): +def check_zone_is_signed(server, zone, tsig=None): addr = server.ip fqdn = f"{zone}." # wait until zone is fully signed signed = False for _ in range(10): - response = _query(server, fqdn, dns.rdatatype.NSEC) + response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig) if not isinstance(response, dns.message.Message): isctest.log.debug(f"no response for {fqdn} NSEC from {addr}") elif response.rcode() != dns.rcode.NOERROR: @@ -277,13 +612,119 @@ def check_zone_is_signed(server, zone): assert signed -def check_dnssec_verify(server, zone): +def verify_keys(zone, keys, expected): + """ + Checks keys for a configured zone. This verifies: + 1. The expected number of keys exist in 'keys'. + 2. The keys match the expected properties. + """ + + def _verify_keys(): + # check number of keys matches expected. + if len(keys) != len(expected): + return False + + if len(keys) == 0: + return True + + for expect in expected: + expect.key = None + + for key in keys: + found = False + i = 0 + while not found and i < len(expected): + if expected[i].key is None: + found = key.match_properties(zone, expected[i]) + if found: + key.external = expected[i].properties["legacy"] + expected[i].key = key + i += 1 + if not found: + return False + + return True + + isctest.run.retry_with_timeout(_verify_keys, timeout=10) + + +def check_keytimes(keys, expected): + """ + Check the key timing metadata for all keys in 'keys'. + """ + assert len(keys) == len(expected) + + if len(keys) == 0: + return + + for key in keys: + for expect in expected: + if expect.properties["legacy"]: + continue + + if not key is expect.key: + continue + + synonyms = {} + if "Generated" in expect.timing: + synonyms["Created"] = expect.timing["Generated"] + if "Published" in expect.timing: + synonyms["Publish"] = expect.timing["Published"] + if "PublishCDS" in expect.timing: + synonyms["SyncPublish"] = expect.timing["PublishCDS"] + if "Active" in expect.timing: + synonyms["Activate"] = expect.timing["Active"] + if "Retired" in expect.timing: + synonyms["Inactive"] = expect.timing["Retired"] + if "DeleteCDS" in expect.timing: + synonyms["SyncDelete"] = expect.timing["DeleteCDS"] + if "Revoked" in expect.timing: + synonyms["Revoked"] = expect.timing["Revoked"] + if "Removed" in expect.timing: + synonyms["Delete"] = expect.timing["Removed"] + + assert key.match_timingmetadata(synonyms, file=key.keyfile, comment=True) + if expect.properties["private"]: + assert key.match_timingmetadata(synonyms, file=key.privatefile) + if not expect.properties["legacy"]: + assert key.match_timingmetadata(expect.timing) + + state_changes = [ + "DNSKEYChange", + "KRRSIGChange", + "ZRRSIGChange", + "DSChange", + ] + for change in state_changes: + assert key.is_metadata_consistent( + change, expect.timing, checkval=False + ) + + +def check_keyrelationships(keys, expected): + """ + Check the key relationships (Successor and Predecessor metadata). + """ + for key in keys: + for expect in expected: + if expect.properties["legacy"]: + continue + + if not key is expect.key: + continue + + relationship_status = ["Predecessor", "Successor"] + for status in relationship_status: + assert key.is_metadata_consistent(status, expect.metadata) + + +def check_dnssec_verify(server, zone, tsig=None): # Check if zone if DNSSEC valid with dnssec-verify. fqdn = f"{zone}." verified = False for _ in range(10): - transfer = _query(server, fqdn, dns.rdatatype.AXFR) + transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig) if not isinstance(transfer, dns.message.Message): isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}") elif transfer.rcode() != dns.rcode.NOERROR: @@ -415,9 +856,9 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False): delete_md = f"Sync{delete_md}" for key in keys: - publish = key.get_timing(publish_md) + publish = key.get_timing(publish_md, must_exist=False) delete = key.get_timing(delete_md, must_exist=False) - published = now >= publish + published = publish is not None and now >= publish removed = delete is not None and delete <= now if not published or removed: @@ -502,8 +943,8 @@ def check_cds(rrset, keys): assert numcds == len(cdss) -def _query_rrset(server, fqdn, qtype): - response = _query(server, fqdn, qtype) +def _query_rrset(server, fqdn, qtype, tsig=None): + response = _query(server, fqdn, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR rrs = [] @@ -523,46 +964,46 @@ def _query_rrset(server, fqdn, qtype): return rrs, rrsigs -def check_apex(server, zone, ksks, zsks): +def check_apex(server, zone, ksks, zsks, tsig=None): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." # test dnskey query - dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY) + dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) assert len(dnskeys) > 0 check_dnskeys(dnskeys, ksks, zsks) assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks) # test soa query - soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA) + soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks) # test cdnskey query - cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY) + cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig) check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) if len(cdnskeys) > 0: assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks) # test cds query - cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS) + cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig) check_cds(cds, ksks) if len(cds) > 0: assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks) -def check_subdomain(server, zone, ksks, zsks): +def check_subdomain(server, zone, ksks, zsks, tsig=None): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." qtype = dns.rdatatype.A - response = _query(server, qname, qtype) + response = _query(server, qname, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1" @@ -577,3 +1018,86 @@ def check_subdomain(server, zone, ksks, zsks): assert len(rrsigs) > 0 check_signatures(rrsigs, qtype, fqdn, ksks, zsks) + + +def next_key_event_equals(server, zone, next_event): + if next_event is None: + # No next key event check. + return True + + val = int(next_event.total_seconds()) + if val == 3600: + waitfor = rf".*zone {zone}.*: next key event in (.*) seconds" + else: + # Don't want default loadkeys interval. + waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" + + with server.watch_log_from_start() as watcher: + watcher.wait_for_line(re.compile(waitfor)) + + # WMM: The with code below is extracting the line the watcher was + # waiting for. If WatchLog.wait_for_line()` returned the matched string, + # we can use it directly on `re.match`. + next_found = False + minval = val - NEXT_KEY_EVENT_THRESHOLD + maxval = val + NEXT_KEY_EVENT_THRESHOLD + with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp: + for line in fp: + match = re.match(waitfor, line) + if match is not None: + nextval = int(match.group(1)) + if minval <= nextval <= maxval: + next_found = True + break + + isctest.log.debug( + f"check next key event: expected {val} in: {line.strip()}" + ) + + return next_found + + +def keydir_to_keylist( + zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False +) -> List[Key]: + """ + Retrieve all keys from the key files in a directory. If 'zone' is None, + retrieve all keys in the directory, otherwise only those matching the + zone name. If 'keydir' is None, search the current directory. + """ + if zone is None: + zone = "" + + all_keys = [] + if keydir is None: + regex = rf"(K{zone}\.\+.*\+.*)\.key" + for filename in glob.glob(f"K{zone}.+*+*.key"): + match = re.match(regex, filename) + if match is not None: + all_keys.append(Key(match.group(1))) + else: + regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key" + for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"): + match = re.match(regex, filename) + if match is not None: + all_keys.append(Key(match.group(1), keydir)) + + states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"] + + def used(kk): + if not in_use: + return True + + for state in states: + val = kk.get_metadata(state, must_exist=False) + if val not in ["undefined", "hidden"]: + isctest.log.debug(f"key {kk} in use") + return True + + return False + + return [k for k in all_keys if used(k)] + + +def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: + return [Key(name, keydir) for name in keystr.split()] diff --git a/bin/tests/system/isctest/util.py b/bin/tests/system/isctest/util.py new file mode 100644 index 0000000000..904e088ff6 --- /dev/null +++ b/bin/tests/system/isctest/util.py @@ -0,0 +1,42 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +import dns.zone + + +def zone_contains( + zone: dns.zone.Zone, rrset: dns.rrset.RRset, compare_ttl=False +) -> bool: + """Check if a zone contains RRset""" + + def compare_rrs(rr1, rrset): + rr2 = next((other_rr for other_rr in rrset if rr1 == other_rr), None) + if rr2 is None: + return False + if compare_ttl: + return rr1.ttl == rr2.ttl + return True + + for _, node in zone.nodes.items(): + for rdataset in node: + for rr in rdataset: + if compare_rrs(rr, rrset): + return True + + return False + + +def file_contents_contain(file, substr): + with open(file, "r", encoding="utf-8") as fp: + for line in fp: + if f"{substr}" in line: + return True + return False diff --git a/bin/tests/system/ksr/tests_ksr.py b/bin/tests/system/ksr/tests_ksr.py index 78724d137b..5512f34fa2 100644 --- a/bin/tests/system/ksr/tests_ksr.py +++ b/bin/tests/system/ksr/tests_ksr.py @@ -10,19 +10,14 @@ # information regarding copyright ownership. from datetime import timedelta -import difflib import os import shutil import time -from typing import List, Optional import pytest import isctest -from isctest.kasp import ( - Key, - KeyTimingMetadata, -) +from isctest.kasp import KeyTimingMetadata pytestmark = pytest.mark.extra_artifacts( [ @@ -89,31 +84,6 @@ def between(value, start, end): return start < value < end -def check_file_contents_equal(file1, file2): - def normalize_line(line): - # remove trailing&leading whitespace and replace multiple whitespaces - return " ".join(line.split()) - - def read_lines(file_path): - with open(file_path, "r", encoding="utf-8") as file: - return [normalize_line(line) for line in file.readlines()] - - lines1 = read_lines(file1) - lines2 = read_lines(file2) - - differ = difflib.Differ() - diff = differ.compare(lines1, lines2) - - for line in diff: - assert not line.startswith("+ ") and not line.startswith( - "- " - ), f'file contents of "{file1}" and "{file2}" differ' - - -def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: - return [Key(name, keydir) for name in keystr.split()] - - def ksr(zone, policy, action, options="", raise_on_exception=True): ksr_command = [ os.environ.get("KSR"), @@ -515,14 +485,14 @@ def test_ksr_common(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None) # check that 'dnssec-ksr keygen' pregenerates right amount of keys out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y") - zsks = keystr_to_keylist(out) + zsks = isctest.kasp.keystr_to_keylist(out) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) @@ -532,7 +502,7 @@ def test_ksr_common(servers): # in the given key directory zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) @@ -575,18 +545,22 @@ def test_ksr_common(servers): # check that 'dnssec-ksr keygen' selects pregenerated keys for # the same time bundle out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y") - selected_zsks = keystr_to_keylist(out, zskdir) + selected_zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(selected_zsks) == 2 for index, key in enumerate(selected_zsks): assert zsks[index] == key - check_file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup") - check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") - check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup") + isctest.check.file_contents_equal( + f"{key.path}.private", f"{key.path}.private.backup" + ) + isctest.check.file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") + isctest.check.file_contents_equal( + f"{key.path}.state", f"{key.path}.state.backup" + ) # check that 'dnssec-ksr keygen' generates only necessary keys for # overlapping time bundle out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1") - overlapping_zsks = keystr_to_keylist(out, zskdir) + overlapping_zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(overlapping_zsks) == 4 verbose = err.split() @@ -606,15 +580,19 @@ def test_ksr_common(servers): for index, key in enumerate(overlapping_zsks): if index < 2: assert zsks[index] == key - check_file_contents_equal( + isctest.check.file_contents_equal( f"{key.path}.private", f"{key.path}.private.backup" ) - check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") - check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup") + isctest.check.file_contents_equal( + f"{key.path}.key", f"{key.path}.key.backup" + ) + isctest.check.file_contents_equal( + f"{key.path}.state", f"{key.path}.state.backup" + ) # run 'dnssec-ksr keygen' again with verbosity 0 out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y") - overlapping_zsks2 = keystr_to_keylist(out, zskdir) + overlapping_zsks2 = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(overlapping_zsks2) == 4 check_keys(overlapping_zsks2, lifetime) for index, key in enumerate(overlapping_zsks2): @@ -709,7 +687,7 @@ def test_ksr_lastbundle(servers): kskdir = "ns1/offline" offset = -timedelta(days=365) out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, offset=offset) @@ -717,7 +695,7 @@ def test_ksr_lastbundle(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) @@ -788,7 +766,7 @@ def test_ksr_inthemiddle(servers): kskdir = "ns1/offline" offset = -timedelta(days=365) out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, offset=offset) @@ -796,7 +774,7 @@ def test_ksr_inthemiddle(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 4 lifetime = timedelta(days=31 * 6) @@ -868,13 +846,13 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end): then = now + offset until = now + end out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # key generation zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 2 # create request @@ -941,7 +919,7 @@ def test_ksr_unlimited(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None) @@ -949,7 +927,7 @@ def test_ksr_unlimited(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 1 lifetime = None @@ -1058,7 +1036,7 @@ def test_ksr_twotone(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 2 ksks_defalg = [] @@ -1082,7 +1060,7 @@ def test_ksr_twotone(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) # First algorithm keys have a lifetime of 3 months, so there should # be 4 created keys. Second algorithm keys have a lifetime of 5 # months, so there should be 3 created keys. While only two time @@ -1176,7 +1154,7 @@ def test_ksr_kskroll(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 2 lifetime = timedelta(days=31 * 6) @@ -1185,7 +1163,7 @@ def test_ksr_kskroll(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 1 check_keys(zsks, None)