2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-21 17:48:07 +00:00
bind/bin/tests/system/ksr/tests_ksr.py
Nicki Křížek 7639c58c48 Disable too-many/too-few pylint checks
Enforcing pylint standards and default for our test code seems
counter-productive. Since most of the newly added code are tests or is
test-related, encountering these checks rarely make us refactor the code
in other ways and we just disable these checks individually. Code that
is too complex or convoluted will be pointed out in reviews anyways.
2024-10-17 11:01:44 +02:00

1054 lines
32 KiB
Python

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from datetime import timedelta
import difflib
import os
import shutil
import time
from typing import List, Optional
import isctest
from isctest.kasp import (
Key,
KeyTimingMetadata,
)
def between(value, start, end):
if value is None or start is None or end is None:
return False
return start < value < end
def check_file_contents_equal(file1, file2):
def normalize_line(line):
# remove trailing&leading whitespace and replace multiple whitespaces
return " ".join(line.split())
def read_lines(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return [normalize_line(line) for line in file.readlines()]
lines1 = read_lines(file1)
lines2 = read_lines(file2)
differ = difflib.Differ()
diff = differ.compare(lines1, lines2)
for line in diff:
assert not line.startswith("+ ") and not line.startswith(
"- "
), f'file contents of "{file1}" and "{file2}" differ'
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
return [Key(name, keydir) for name in keystr.split()]
def keygen(zone, policy, keydir, when="now"):
keygen_command = [
os.environ.get("KEYGEN"),
"-l",
"ns1/named.conf",
"-fK",
"-K",
keydir,
"-k",
policy,
"-P",
when,
"-A",
when,
"-P",
"sync",
when,
zone,
]
return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
def ksr(zone, policy, action, options="", raise_on_exception=True):
ksr_command = [
os.environ.get("KSR"),
"-l",
"ns1/named.conf",
"-k",
policy,
*options.split(),
action,
zone,
]
out = isctest.run.cmd(
ksr_command, log_stdout=True, raise_on_exception=raise_on_exception
)
return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
def check_keys(
keys,
lifetime,
alg=os.environ["DEFAULT_ALGORITHM_NUMBER"],
size=os.environ["DEFAULT_BITS"],
offset=0,
with_state=False,
):
# Check keys that were created.
num = 0
now = KeyTimingMetadata.now()
for key in keys:
# created: from keyfile plus offset
created = key.get_timing("Created") + offset
# active: retired previous key
if num == 0:
active = created
else:
active = retired
# published: dnskey-ttl + publish-safety + propagation
published = active - timedelta(hours=2, minutes=5)
# retired: zsk-lifetime
if lifetime is not None:
retired = active + lifetime
# removed: ttlsig + retire-safety + sign-delay + propagation
removed = retired + timedelta(days=10, hours=1, minutes=5)
else:
retired = None
removed = None
if retired is None or between(now, published, retired):
goal = "omnipresent"
pubdelay = published + timedelta(hours=2, minutes=5)
signdelay = active + timedelta(days=10, hours=1, minutes=5)
if between(now, published, pubdelay):
state_dnskey = "rumoured"
else:
state_dnskey = "omnipresent"
if between(now, active, signdelay):
state_zrrsig = "rumoured"
else:
state_zrrsig = "omnipresent"
else:
goal = "hidden"
state_dnskey = "hidden"
state_zrrsig = "hidden"
with open(key.statefile, "r", encoding="utf-8") as file:
metadata = file.read()
assert f"Algorithm: {alg}" in metadata
assert f"Length: {size}" in metadata
assert "KSK: no" in metadata
assert "ZSK: yes" in metadata
assert f"Published: {published}" in metadata
assert f"Active: {active}" in metadata
if lifetime is not None:
assert f"Retired: {retired}" in metadata
assert f"Removed: {removed}" in metadata
assert f"Lifetime: {int(lifetime.total_seconds())}" in metadata
else:
assert "Lifetime: 0" in metadata
assert "Retired:" not in metadata
assert "Removed:" not in metadata
if with_state:
assert f"GoalState: {goal}" in metadata
assert f"DNSKEYState: {state_dnskey}" in metadata
assert f"ZRRSIGState: {state_zrrsig}" in metadata
assert "KRRSIGState:" not in metadata
assert "DSState:" not in metadata
num += 1
def check_keysigningrequest(out, zsks, start, end):
lines = out.split("\n")
line_no = 0
inception = start
while inception < end:
next_bundle = end + 1
# expect bundle header
assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no]
line_no += 1
# expect zsks
for key in sorted(zsks):
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
removed = key.get_timing("Delete", must_exist=False)
if between(removed, inception, next_bundle):
next_bundle = removed
if published > inception:
continue
if removed is not None and inception >= removed:
continue
# this zsk must be in the ksr
assert key.dnskey_equals(lines[line_no])
line_no += 1
inception = next_bundle
# ksr footer
assert ";; KeySigningRequest 1.0 generated at" in lines[line_no]
line_no += 1
# trailing empty lines
while line_no < len(lines):
assert lines[line_no] == ""
line_no += 1
assert line_no == len(lines)
def check_signedkeyresponse(
out,
zone,
ksks,
zsks,
start,
end,
refresh,
cdnskey=True,
cds="SHA-256",
):
lines = out.split("\n")
line_no = 0
next_bundle = end + 1
inception = start
while inception < end:
# A single signed key response may consist of:
# ;; SignedKeyResponse (header)
# ;; DNSKEY 257 (one per published key in ksks)
# ;; DNSKEY 256 (one per published key in zsks)
# ;; RRSIG(DNSKEY) (one per active key in ksks)
# ;; CDNSKEY (one per published key in ksks)
# ;; RRSIG(CDNSKEY) (one per active key in ksks)
# ;; CDS (one per published key in ksks)
# ;; RRSIG(CDS) (one per active key in ksks)
sigstart = inception - timedelta(hours=1) # clockskew
sigend = inception + timedelta(days=14) # sig-validity
next_bundle = sigend + refresh
# ignore empty lines
while line_no < len(lines):
if lines[line_no] == "":
line_no += 1
else:
break
# expect bundle header
assert f";; SignedKeyResponse 1.0 {inception}" in lines[line_no]
line_no += 1
# expect ksks
for key in sorted(ksks):
published = key.get_timing("Publish")
removed = key.get_timing("Delete", must_exist=False)
if published > inception:
continue
if removed is not None and inception >= removed:
continue
# this ksk must be in the ksr
assert key.dnskey_equals(lines[line_no])
line_no += 1
# expect zsks
for key in sorted(zsks):
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
removed = key.get_timing("Delete", must_exist=False)
if between(removed, inception, next_bundle):
next_bundle = removed
if published > inception:
continue
if removed is not None and inception >= removed:
continue
# this zsk must be in the ksr
assert key.dnskey_equals(lines[line_no])
line_no += 1
# expect rrsig(dnskey)
for key in sorted(ksks):
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
continue
if inactive is not None and inception >= inactive:
continue
# there must be a signature of this ksk
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
rrsig = " ".join(lines[line_no].split())
assert expect in rrsig
line_no += 1
# expect cdnskey
if cdnskey:
for key in sorted(ksks):
published = key.get_timing("Publish")
removed = key.get_timing("Delete", must_exist=False)
if published > inception:
continue
if removed is not None and inception >= removed:
continue
# the cdnskey of this ksk must be in the ksr
assert key.dnskey_equals(lines[line_no], cdnskey=True)
line_no += 1
# expect rrsig(cdnskey)
for key in sorted(ksks):
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
continue
if inactive is not None and inception >= inactive:
continue
# there must be a signature of this ksk
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
rrsig = " ".join(lines[line_no].split())
assert expect in rrsig
line_no += 1
# expect cds
if cds != "":
for key in sorted(ksks):
published = key.get_timing("Publish")
removed = key.get_timing("Delete", must_exist=False)
if published > inception:
continue
if removed is not None and inception >= removed:
continue
# the cds of this ksk must be in the ksr
expected_cds = cds.split(",")
for alg in expected_cds:
assert key.cds_equals(lines[line_no], alg.strip())
line_no += 1
# expect rrsig(cds)
for key in sorted(ksks):
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
continue
if inactive is not None and inception >= inactive:
continue
# there must be a signature of this ksk
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
rrsig = " ".join(lines[line_no].split())
assert expect in rrsig
line_no += 1
inception = next_bundle
# skr footer
assert ";; SignedKeyResponse 1.0 generated at" in lines[line_no]
line_no += 1
# trailing empty lines
while line_no < len(lines):
assert lines[line_no] == ""
line_no += 1
assert line_no == len(lines)
def test_ksr_errors():
# check that 'dnssec-ksr' errors on unknown action
_, err = ksr("common.test", "common", "foobar", raise_on_exception=False)
assert "dnssec-ksr: fatal: unknown command 'foobar'" in err
# check that 'dnssec-ksr keygen' errors on missing end date
_, err = ksr("common.test", "common", "keygen", raise_on_exception=False)
assert "dnssec-ksr: fatal: keygen requires an end date" in err
# check that 'dnssec-ksr keygen' errors on zone with csk
_, err = ksr(
"csk.test", "csk", "keygen", options="-K ns1 -e +2y", raise_on_exception=False
)
assert "dnssec-ksr: fatal: policy 'csk' has no zsks" in err
# check that 'dnssec-ksr request' errors on missing end date
_, err = ksr("common.test", "common", "request", raise_on_exception=False)
assert "dnssec-ksr: fatal: request requires an end date" in err
# check that 'dnssec-ksr sign' errors on missing ksr file
_, err = ksr(
"common.test",
"common",
"sign",
options="-K ns1/offline -i now -e +1y",
raise_on_exception=False,
)
assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in err
def test_ksr_common(servers):
# common test cases (1)
zone = "common.test"
policy = "common"
n = 1
# create ksk
kskdir = "ns1/offline"
out = keygen(zone, policy, kskdir)
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y")
zsks = keystr_to_keylist(out)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
# in the given key directory
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime)
for key in zsks:
privatefile = f"{key.path}.private"
keyfile = f"{key.path}.key"
statefile = f"{key.path}.state"
shutil.copyfile(privatefile, f"{privatefile}.backup")
shutil.copyfile(keyfile, f"{keyfile}.backup")
shutil.copyfile(statefile, f"{statefile}.backup")
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# common test cases (2)
n = 2
# check that 'dnssec-ksr keygen' selects pregenerated keys for
# the same time bundle
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
selected_zsks = keystr_to_keylist(out, zskdir)
assert len(selected_zsks) == 2
for index, key in enumerate(selected_zsks):
assert zsks[index] == key
check_file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup")
check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup")
# check that 'dnssec-ksr keygen' generates only necessary keys for
# overlapping time bundle
out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
overlapping_zsks = keystr_to_keylist(out, zskdir)
assert len(overlapping_zsks) == 4
verbose = err.split()
selected = 0
generated = 0
for output in verbose:
if "Selecting" in output:
selected += 1
if "Generating" in output:
generated += 1
assert selected == 2
assert generated == 2
for index, key in enumerate(overlapping_zsks):
if index < 2:
assert zsks[index] == key
check_file_contents_equal(
f"{key.path}.private", f"{key.path}.private.backup"
)
check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup")
# run 'dnssec-ksr keygen' again with verbosity 0
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
overlapping_zsks2 = keystr_to_keylist(out, zskdir)
assert len(overlapping_zsks2) == 4
check_keys(overlapping_zsks2, lifetime)
for index, key in enumerate(overlapping_zsks2):
assert overlapping_zsks[index] == key
# check that 'dnssec-ksr request' creates correct ksr if the
# interval is shorter
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y")
fname = f"{zone}.ksr.{n}.shorter"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr request' creates correct ksr with new interval
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
until = now + timedelta(days=365 * 2)
check_keysigningrequest(out, overlapping_zsks, now, until)
# check that 'dnssec-ksr request' errors if there are not enough keys
_, err = ksr(
zone,
policy,
"request",
options=f"-K ns1 -i {now} -e +3y",
raise_on_exception=False,
)
error = f"no {zone}/ECDSAP256SHA256 zsk key pair found for bundle"
assert f"dnssec-ksr: fatal: {error}" in err
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +2y"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out,
zone,
ksks,
overlapping_zsks,
now,
until,
refresh,
)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
ns1.rndc(f"skr -import {fname} {zone}", log=False)
# test zone is correctly signed
# - check rndc dnssec -status output
isctest.kasp.check_dnssecstatus(ns1, zone, overlapping_zsks, policy=policy)
# - zone is signed
isctest.kasp.check_zone_is_signed(ns1, zone)
# - dnssec_verify
isctest.kasp.check_dnssec_verify(ns1, zone)
# - check keys
check_keys(overlapping_zsks, lifetime, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks)
def test_ksr_lastbundle(servers):
zone = "last-bundle.test"
policy = "common"
n = 1
# create ksk
kskdir = "ns1/offline"
now = KeyTimingMetadata.now()
offset = -timedelta(days=365)
when = now + offset - timedelta(days=1)
out = keygen(zone, policy, kskdir, when=str(when))
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime, offset=offset)
# check that 'dnssec-ksr request' creates correct ksr
then = zsks[0].get_timing("Created") + offset
until = then + timedelta(days=366)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, then, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1d"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
ns1.rndc(f"skr -import {fname} {zone}", log=False)
# test zone is correctly signed
# - check rndc dnssec -status output
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
# - zone is signed
isctest.kasp.check_zone_is_signed(ns1, zone)
# - dnssec_verify
isctest.kasp.check_dnssec_verify(ns1, zone)
# - check keys
check_keys(zsks, lifetime, offset=offset, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
# check that last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
assert f"zone {zone}/IN (signed): zone_rekey: {warning}" in ns1.log
def test_ksr_inthemiddle(servers):
zone = "in-the-middle.test"
policy = "common"
n = 1
# create ksk
kskdir = "ns1/offline"
now = KeyTimingMetadata.now()
offset = -timedelta(days=365)
when = now + offset - timedelta(days=1)
out = keygen(zone, policy, kskdir, when=str(when))
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 4
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime, offset=offset)
# check that 'dnssec-ksr request' creates correct ksr
then = zsks[0].get_timing("Created")
then = then + offset
until = then + timedelta(days=365 * 2)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, then, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1y"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
ns1.rndc(f"skr -import {fname} {zone}", log=False)
# test zone is correctly signed
# - check rndc dnssec -status output
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
# - zone is signed
isctest.kasp.check_zone_is_signed(ns1, zone)
# - dnssec_verify
isctest.kasp.check_dnssec_verify(ns1, zone)
# - check keys
check_keys(zsks, lifetime, offset=offset, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
# check that no last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
assert f"zone {zone}/IN (signed): zone_rekey: {warning}" not in ns1.log
def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
n = 1
# create ksk
kskdir = "ns1/offline"
now = KeyTimingMetadata.now()
then = now + offset
until = now + end
out = keygen(zone, policy, kskdir, when=str(then))
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# key generation
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
# create request
now = zsks[0].get_timing("Created")
then = now + offset
until = now + end
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
# sign request
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e {until}"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
# add zone
server.rndc(
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
server.rndc(f"skr -import {fname} {zone}", log=False)
# test that rekey logs error
time_remaining = 10
warning = "no available SKR bundle"
line = f"zone {zone}/IN (signed): zone_rekey failure: {warning}"
while time_remaining > 0:
if line not in server.log:
time_remaining -= 1
time.sleep(1)
else:
break
assert line in server.log
def test_ksr_rekey_logs_error(servers):
# check that an SKR that is too old logs error
check_ksr_rekey_logs_error(
servers["ns1"], "past.test", "common", -63072000, -31536000
)
# check that an SKR that is too new logs error
check_ksr_rekey_logs_error(
servers["ns1"], "future.test", "common", 2592000, 31536000
)
def test_ksr_unlimited(servers):
zone = "unlimited.test"
policy = "unlimited"
n = 1
# create ksk
kskdir = "ns1/offline"
out = keygen(zone, policy, kskdir)
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 1
lifetime = None
check_keys(zsks, lifetime)
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365 * 4)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr without cdnskey
out, _ = ksr(
zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.no-cdnskey.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out,
zone,
ksks,
zsks,
now,
until,
refresh,
cdnskey=False,
cds="SHA-1, SHA-256, SHA-384",
)
# check that 'dnssec-ksr sign' creates correct skr without cds
out, _ = ksr(
zone, "no-cds", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.no-cds.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out,
zone,
ksks,
zsks,
now,
until,
refresh,
cds="",
)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.{policy}.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(skrfile, f"ns1/{skrfile}")
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
# test zone is correctly signed
# - check rndc dnssec -status output
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
# - zone is signed
isctest.kasp.check_zone_is_signed(ns1, zone)
# - dnssec_verify
isctest.kasp.check_dnssec_verify(ns1, zone)
# - check keys
check_keys(zsks, lifetime, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
def test_ksr_twotone(servers):
zone = "two-tone.test"
policy = "two-tone"
n = 1
# create ksk
kskdir = "ns1/offline"
out = keygen(zone, policy, kskdir)
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 2
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
# First algorithm keys have a lifetime of 3 months, so there should
# be 4 created keys. Second algorithm keys have a lifetime of 5
# months, so there should be 3 created keys. While only two time
# bundles of 5 months fit into one year, we need to create an extra
# key for the remainder of the bundle. So 7 in total.
assert len(zsks) == 7
zsks_defalg = []
zsks_altalg = []
for zsk in zsks:
alg = zsk.get_metadata("Algorithm")
if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"):
zsks_defalg.append(zsk)
elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"):
zsks_altalg.append(zsk)
assert len(zsks_defalg) == 4
assert len(zsks_altalg) == 3
lifetime = timedelta(days=31 * 3)
check_keys(zsks_defalg, lifetime)
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
size = os.environ.get("ALTERNATIVE_BITS")
lifetime = timedelta(days=31 * 5)
check_keys(zsks_altalg, lifetime, alg, size)
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
skrfile = f"{zone}.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -timedelta(days=5)
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(skrfile, f"ns1/{skrfile}")
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
# test zone is correctly signed
# - check rndc dnssec -status output
isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy)
# - zone is signed
isctest.kasp.check_zone_is_signed(ns1, zone)
# - dnssec_verify
isctest.kasp.check_dnssec_verify(ns1, zone)
# - check keys
lifetime = timedelta(days=31 * 3)
check_keys(zsks_defalg, lifetime, with_state=True)
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
size = os.environ.get("ALTERNATIVE_BITS")
lifetime = timedelta(days=31 * 5)
check_keys(zsks_altalg, lifetime, alg, size, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)