mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-30 22:15:20 +00:00
Introduce pytest check_next_key_event, get_keyids
For the kasp tests we need a new utility that can retrieve a list of Keys from a given directory, belonging to a specific zone. This is 'keydir_to_keylist' and is the replacement of 'kasp.sh:get_keyids()'. 'next_key_event_eqauls' is a method to check when the next key event is scheduled, needed for the rollover tests, and is the equivalent of shell script 'check_next_key_event'.
This commit is contained in:
@@ -10,6 +10,7 @@
|
|||||||
# information regarding copyright ownership.
|
# information regarding copyright ownership.
|
||||||
|
|
||||||
from functools import total_ordering
|
from functools import total_ordering
|
||||||
|
import glob
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import re
|
import re
|
||||||
@@ -25,6 +26,8 @@ import isctest.query
|
|||||||
|
|
||||||
DEFAULT_TTL = 300
|
DEFAULT_TTL = 300
|
||||||
|
|
||||||
|
NEXT_KEY_EVENT_THRESHOLD = 100
|
||||||
|
|
||||||
|
|
||||||
def _query(server, qname, qtype):
|
def _query(server, qname, qtype):
|
||||||
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
|
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
|
||||||
@@ -1010,6 +1013,43 @@ def check_subdomain(server, zone, ksks, zsks):
|
|||||||
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
|
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
|
||||||
|
|
||||||
|
|
||||||
|
def next_key_event_equals(server, zone, next_event):
|
||||||
|
if next_event is None:
|
||||||
|
# No next key event check.
|
||||||
|
return True
|
||||||
|
|
||||||
|
val = int(next_event.total_seconds())
|
||||||
|
if val == 3600:
|
||||||
|
waitfor = rf".*zone {zone}.*: next key event in (.*) seconds"
|
||||||
|
else:
|
||||||
|
# Don't want default loadkeys interval.
|
||||||
|
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
|
||||||
|
|
||||||
|
with server.watch_log_from_start() as watcher:
|
||||||
|
watcher.wait_for_line(re.compile(waitfor))
|
||||||
|
|
||||||
|
# WMM: The with code below is extracting the line the watcher was
|
||||||
|
# waiting for. If WatchLog.wait_for_line()` returned the matched string,
|
||||||
|
# we can use it directly on `re.match`.
|
||||||
|
next_found = False
|
||||||
|
minval = val - NEXT_KEY_EVENT_THRESHOLD
|
||||||
|
maxval = val + NEXT_KEY_EVENT_THRESHOLD
|
||||||
|
with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp:
|
||||||
|
for line in fp:
|
||||||
|
match = re.match(waitfor, line)
|
||||||
|
if match is not None:
|
||||||
|
nextval = int(match.group(1))
|
||||||
|
if minval <= nextval <= maxval:
|
||||||
|
next_found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
isctest.log.debug(
|
||||||
|
f"check next key event: expected {val} in: {line.strip()}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return next_found
|
||||||
|
|
||||||
|
|
||||||
def keydir_to_keylist(
|
def keydir_to_keylist(
|
||||||
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
|
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
|
||||||
) -> List[Key]:
|
) -> List[Key]:
|
||||||
|
Reference in New Issue
Block a user