From 12e57eb222c3e4e721d5978d41c84efe0caadd77 Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Fri, 14 Mar 2025 11:10:22 +0100 Subject: [PATCH] Introduce pytest check_next_key_event, get_keyids For the kasp tests we need a new utility that can retrieve a list of Keys from a given directory, belonging to a specific zone. This is 'keydir_to_keylist' and is the replacement of 'kasp.sh:get_keyids()'. 'next_key_event_eqauls' is a method to check when the next key event is scheduled, needed for the rollover tests, and is the equivalent of shell script 'check_next_key_event'. --- bin/tests/system/isctest/kasp.py | 40 ++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 1cab813896..fa3c297f78 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -10,6 +10,7 @@ # information regarding copyright ownership. from functools import total_ordering +import glob import os from pathlib import Path import re @@ -25,6 +26,8 @@ import isctest.query DEFAULT_TTL = 300 +NEXT_KEY_EVENT_THRESHOLD = 100 + def _query(server, qname, qtype): query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) @@ -1010,6 +1013,43 @@ def check_subdomain(server, zone, ksks, zsks): check_signatures(rrsigs, qtype, fqdn, ksks, zsks) +def next_key_event_equals(server, zone, next_event): + if next_event is None: + # No next key event check. + return True + + val = int(next_event.total_seconds()) + if val == 3600: + waitfor = rf".*zone {zone}.*: next key event in (.*) seconds" + else: + # Don't want default loadkeys interval. + waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" + + with server.watch_log_from_start() as watcher: + watcher.wait_for_line(re.compile(waitfor)) + + # WMM: The with code below is extracting the line the watcher was + # waiting for. If WatchLog.wait_for_line()` returned the matched string, + # we can use it directly on `re.match`. + next_found = False + minval = val - NEXT_KEY_EVENT_THRESHOLD + maxval = val + NEXT_KEY_EVENT_THRESHOLD + with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp: + for line in fp: + match = re.match(waitfor, line) + if match is not None: + nextval = int(match.group(1)) + if minval <= nextval <= maxval: + next_found = True + break + + isctest.log.debug( + f"check next key event: expected {val} in: {line.strip()}" + ) + + return next_found + + def keydir_to_keylist( zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False ) -> List[Key]: