2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-23 02:28:55 +00:00
bind/bin/tests/system/dnssec/tests_validation.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1601 lines
59 KiB
Python
Raw Normal View History

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import re
import shutil
import time
from dns import edns, flags, name, rcode, rdataclass, rdatatype
import pytest
pytest.importorskip("dns", minversion="2.0.0")
import isctest
import isctest.mark
from isctest.util import param
# helper functions
def grep_q(regex, filename):
with open(filename, "r", encoding="utf-8") as f:
blob = f.read().splitlines()
results = [x for x in blob if re.search(regex, x)]
return len(results) != 0
def getfrom(file):
with open(file, encoding="utf-8") as f:
return f.read().strip()
@pytest.mark.requires_zones_loaded("ns2", "ns3")
@pytest.mark.parametrize(
"qname, qtype",
[
param("a.example.", "A"),
param("rfc2535.example.", "SOA"),
],
)
def test_load_transfer(qname, qtype):
# check that we can load and transfer zone
msg = isctest.query.create(qname, qtype)
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res1)
def test_insecure_glue():
# check that for a query against a validating resolver where the
# authoritative zone is unsigned (insecure delegation), glue is returned
# in the additional section
msg = isctest.query.create("a.insecure.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.answer, 1)
isctest.check.rr_count_eq(res.authority, 1)
isctest.check.rr_count_eq(res.additional, 1)
assert str(res.additional[0].name) == "ns.insecure.example."
addrs = [str(a) for a in res.additional[0]]
assert "10.53.0.3" in addrs
def test_adflag():
# compare auth and recursive answers
msg = isctest.query.create("a.example", "A", dnssec=False)
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
# check no AD flag in authoritative response
isctest.check.noadflag(res1)
# check validating resolver sends AD=1 if the client sent AD=1
isctest.check.adflag(res2)
# check that AD=0 unless the client sent AD=1
msg = isctest.query.create("a.example", "A", dnssec=False, ad=False)
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noadflag(res2)
def test_secure_root(servers):
# check that a query for a secure root validates
msg = isctest.query.create(".", "KEY")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
# check that "rndc secroots" dumps the trusted keys
ns4 = servers["ns4"]
key = int(getfrom("ns1/managed.key.id"))
alg = os.environ["DEFAULT_ALGORITHM"]
expected = f"./{alg}/{key} ; static"
response = ns4.rndc("secroots -", log=False).splitlines()
assert expected in response
assert len(response) == 10
def test_positive_validation_nsec():
# positive answer
msg = isctest.query.create("a.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# wildcard
msg = isctest.query.create("a.wild.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
assert str(res2.authority[0].name) == "*.wild.example."
assert res2.authority[0].rdtype == rdatatype.NSEC
nsecs = [str(a).split(" ", maxsplit=1)[0] for a in res2.authority[0]]
assert "z.example." in nsecs
assert res2.authority[1].rdtype == rdatatype.RRSIG
assert res2.authority[1].covers == rdatatype.NSEC
# mixed case
for rrtype in ["a", "txt", "aaaa", "loc"]:
msg = isctest.query.create("mixedcase.secure.example", rrtype)
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
def test_positive_validation_nsec3():
# positive answer
msg = isctest.query.create("a.nsec3.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
# wildcard
msg = isctest.query.create("a.wild.nsec3.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
isctest.check.rr_count_eq(res2.authority, 4)
# unknown NSEC3 hash algorithm
msg = isctest.query.create("nsec3-unknown.example", "SOA", dnssec=False)
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
isctest.check.rr_count_eq(res2.answer, 1)
def test_positive_validation_optout():
# positive answer
msg = isctest.query.create("a.optout.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# wildcard
msg = isctest.query.create("a.wild.optout.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# unknown NSEC3 hash algorithm
msg = isctest.query.create("optout-unknown.example", "SOA", dnssec=False)
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
isctest.check.rr_count_eq(res2.answer, 1)
def answer_has(r, rdtype):
return bool([r for r in r.answer if r.rdtype == rdtype])
def test_chain_validation():
# check validation of ANY response
msg = isctest.query.create("foo.example", "ANY")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.rr_count_eq(res2.answer, 6) # 2 records, 1 NSEC, 3 RRSIGs
# check validation of CNAME response
msg = isctest.query.create("cname1.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.rr_count_eq(res2.answer, 4) # CNAME, TXT, 2 RRSIGs
# check validation of DNAME response
msg = isctest.query.create("foo.dname1.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.rr_count_eq(res2.answer, 5) # DNAME, TXT, 2 RRSIGs, synth CNAME
# check validation of CNAME response to ANY query
msg = isctest.query.create("cname2.example", "ANY")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.rr_count_eq(res2.answer, 4) # CNAME, NSEC, 2 RRSIGs
# check validation of DNAME response to ANY query
msg = isctest.query.create("foo.dname2.example", "ANY")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.rr_count_eq(res2.answer, 3) # DNAME, RSRIG, synth CNAME
# check bad CNAME signature is caught after +CD query
msg = isctest.query.create("bad-cname.example", "A", dnssec=False, cd=True)
# query once with CD to prime the cache
res = isctest.query.tcp(msg, "10.53.0.4")
# query again with CD, bogus pending data should be returned
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
assert "a.example." in str(res.answer[0])
assert "10.0.0.1" in str(res.answer[1])
# query again without CD, bogus data should be rejected
msg = isctest.query.create("bad-cname.example", "A", dnssec=False)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
# check bad DNAME signature is caught after +CD query
msg = isctest.query.create("a.bad-dname.example", "A", dnssec=False, cd=True)
# query once with CD to prime the cache
res = isctest.query.tcp(msg, "10.53.0.4")
# query again with CD, bogus pending data should be returned
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
assert "example." in str(res.answer[0])
assert "a.example." in str(res.answer[1])
assert "10.0.0.1" in str(res.answer[2])
# query again without CD, bogus data should be rejected
msg = isctest.query.create("a.bad-dname.example", "A", dnssec=False)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
# check DNSKEY lookup via CNAME
msg = isctest.query.create("cnameandkey.secure.example", "DNSKEY")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
assert answer_has(res2, rdatatype.CNAME)
# check KEY lookup via CNAME
msg = isctest.query.create("cnameandkey.secure.example", "KEY")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
assert not answer_has(res2, rdatatype.CNAME)
# check KEY lookup via CNAME (not present)
msg = isctest.query.create("cnamenokey.secure.example", "KEY")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
assert not answer_has(res2, rdatatype.CNAME)
# check DNSKEY lookup via DNAME
msg = isctest.query.create("a.dnameandkey.secure.example", "DNSKEY")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
assert answer_has(res2, rdatatype.DNAME)
# check KEY lookup via DNAME
msg = isctest.query.create("a.dnameandkey.secure.example", "KEY")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
assert answer_has(res2, rdatatype.DNAME)
@isctest.mark.rsasha1
def test_signing_algorithms_rsasha1():
# rsasha1 (should work with FIPS mode we're as only validating)
msg = isctest.query.create("a.rsasha1.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# rsasha1 (1024 bits) NSEC
msg = isctest.query.create("a.rsasha1-1024.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
def test_signing_algorithms():
# rsasha256
msg = isctest.query.create("a.rsasha256.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# rsasha512
msg = isctest.query.create("a.rsasha512.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# KSK-only DNSKEY
msg = isctest.query.create("a.kskonly.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
def test_private_algorithms(servers):
# positive answer, private algorithm
msg = isctest.query.create("a.rsasha256oid.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# positive answer, unknown private algorithm
msg = isctest.query.create("a.unknownoid.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noadflag(res2)
# positive answer, extra ds for private algorithm
msg = isctest.query.create("a.extradsoid.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# positive anwer, extra ds for unknown private algorithm
ns4 = servers["ns4"]
with ns4.watch_log_from_here() as watcher:
msg = isctest.query.create("a.extradsunknownoid.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.servfail(res2)
watcher.wait_for_line(
"No DNSKEY for extradsunknownoid.example/DS with PRIVATEOID"
)
@isctest.mark.extended_ds_digest
def test_private_algorithms_extended_ds():
# check positive validation with extra ds using extended digest
# type for unknown private algorithm
msg = isctest.query.create("a.extended-ds-unknown-oid.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
def test_negative_validation_nsec():
# nxdomain
msg = isctest.query.create("q.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.adflag(res2)
# nodata
msg = isctest.query.create("a.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.empty_answer(res2)
isctest.check.adflag(res2)
# negative wildcard
msg = isctest.query.create("b.wild.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
def test_negative_validation_nsec3():
# nxdomain
msg = isctest.query.create("q.nsec3.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.adflag(res2)
# nodata
msg = isctest.query.create("a.nsec3.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.empty_answer(res2)
isctest.check.adflag(res2)
# negative wildcard
msg = isctest.query.create("b.wild.nsec3.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
# check NSEC3 zone with mismatched NSEC3PARAM / NSEC parameters
msg = isctest.query.create("non-exist.badparam", "A")
res = isctest.query.tcp(msg, "10.53.0.2")
isctest.check.nxdomain(res)
# check negative unknown NSEC3 hash algorithm does not validate
msg = isctest.query.create("nsec3-unknown.example", "A", dnssec=False)
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.servfail(res2)
def test_excessive_nsec3_iterations():
assert grep_q(
"zone too-many-iterations/IN: excessive NSEC3PARAM iterations", "ns2/named.run"
)
assert grep_q(
"zone too-many-iterations/IN: excessive NSEC3PARAM iterations", "ns3/named.run"
)
# check fallback to insecure with NSEC3 iterations is too high
msg = isctest.query.create("does-not-exist.too-many-iterations", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noadflag(res2)
isctest.check.rr_count_eq(res2.answer, 0)
isctest.check.rr_count_eq(res2.authority, 8)
# check fallback to insecure with NSEC3 iterations is too high (nodata)
msg = isctest.query.create("a.too-many-iterations", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noadflag(res2)
isctest.check.rr_count_eq(res2.answer, 0)
isctest.check.rr_count_eq(res2.authority, 4)
# check fallback to insecure with NSEC3 iterations is too high (wildcard)
msg = isctest.query.create("wild.a.too-many-iterations", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noadflag(res2)
isctest.check.rr_count_eq(res2.answer, 2)
isctest.check.rr_count_eq(res2.authority, 4)
a, _ = res2.answer
assert str(a.name) == "wild.a.too-many-iterations."
assert str(a[0]) == "10.0.0.3"
# check fallback to insecure with high NSEC3 iterations (wildcard nodata)
msg = isctest.query.create("wild.a.too-many-iterations", 100)
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noadflag(res2)
isctest.check.rr_count_eq(res2.authority, 8)
def test_auth_nsec3():
# nxdomain response, closest encloser with 0 empty non-terminals
msg = isctest.query.create("b.b.b.b.b.a.nsec3.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res)
# closest encloser (a.nsec3.example):
rrset = res.get_rrset(
res.authority,
name.from_text("6OVDUHTN094ML2PV8AN90U0DPU823GH2.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "NSEC3 missing from AUTHORITY: " + str(res)
assert "7AT0S0RIDCJRFF2M5H5AAV22CSFJBUL4" in str(rrset[0]).upper()
# no QNAME (b.a.nsec3.example/DSPF4R9UKOEPJ9O34E1H4539LSOTL14E)
rrset = res.get_rrset(
res.authority,
name.from_text("BEJ5GMQA872JF4DAGQ0R3O5Q7A2O5S9L.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "EF2S05SGK1IR2K5SKMFIRERGQCLMR18M" in str(rrset[0]).upper()
# no WILDCARD (*.a.nsec3.example/TFGQ60S97BS31IT1EBEDO63ETM0T5JFA)
rrset = res.get_rrset(
res.authority,
name.from_text("R8EVDMNIGNOKME4LH2H90OSP2PRSNJ1Q.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "VH656EQUD4J02OFVSO4GKOK5D02MS1TL" in str(rrset[0]).upper()
# nxdomain response, closest encloser with 1 ENT
msg = isctest.query.create("b.b.b.b.b.a.a.nsec3.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res)
# closest encloser (a.a.nsec3.example):
rrset = res.get_rrset(
res.authority,
name.from_text("NGCJFSOLJUUE27PFNQNJIME4TQ0OU2DH.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "R8EVDMNIGNOKME4LH2H90OSP2PRSNJ1Q" in str(rrset[0]).upper()
# noqname (b.a.a.nsec3.example):
rrset = res.get_rrset(
res.authority,
name.from_text("R8EVDMNIGNOKME4LH2H90OSP2PRSNJ1Q.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "VH656EQUD4J02OFVSO4GKOK5D02MS1TL" in str(rrset[0]).upper()
# no wildcard (*.a.a.nsec3.example/V7JNNDJ4NLRIU195FRB7DLUCSLU4LLFM)
# is covered by the noqname proof in this case
# nxdomain response, closest encloser with 2 ENTs
msg = isctest.query.create("b.b.b.b.b.a.a.a.nsec3.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res)
# closest encloser (a.a.a.nsec3.example):
rrset = res.get_rrset(
res.authority,
name.from_text("H7RHPDCHSVVRAND332F878C8AB6IBJQV.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "K8IG76R2UPQ13IKFO49L7IB9JRVB6QJI" in str(rrset[0]).upper()
# noqname (b.a.a.a.nsec3.example/18Q8D89RM8GGRSSOPFRB05QS6VEGB1P4)
rrset = res.get_rrset(
res.authority,
name.from_text("0T7VH688AEK0612T69V8692OCMJD50M4.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "1HARMGSKJH0EBU2EI2OJIKTDPIQA6KBI" in str(rrset[0]).upper()
# no WILDCARD (*.a.a.a.nsec3.example/8113LDMSEFPUAG4VGFF1C8KLOUT4Q6PH)
rrset = res.get_rrset(
res.authority,
name.from_text("7AT0S0RIDCJRFF2M5H5AAV22CSFJBUL4.nsec3.example."),
rdataclass.IN,
rdatatype.NSEC3,
)
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
assert "BEJ5GMQA872JF4DAGQ0R3O5Q7A2O5S9L" in str(rrset[0]).upper()
def test_negative_validation_optout():
# nxdomain
msg = isctest.query.create("q.optout.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
# nodata
msg = isctest.query.create("a.optout.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.empty_answer(res2)
isctest.check.adflag(res2)
# negative wildcard
msg = isctest.query.create("b.wild.optout.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# empty NODATA
msg = isctest.query.create("empty.optout.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# (rt22007 regression tests:)
# check optout NSEC3 referral with only insecure delegatons
msg = isctest.query.create("delegation.single-nsec3", "A")
res = isctest.query.tcp(msg, "10.53.0.2")
isctest.check.noerror(res)
for rrset in res.authority:
if (
rrset.rdtype != rdatatype.NSEC3
or "3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" not in rrset.name
):
continue
assert "1 1 1 - 3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" in str(rrset[0])
# check optout NSEC3 NXDOMAIN with only insecure delegatons
msg = isctest.query.create("nonexist.single-nsec3", "A")
res = isctest.query.tcp(msg, "10.53.0.2")
isctest.check.nxdomain(res)
for rrset in res.authority:
if (
rrset.rdtype != rdatatype.NSEC3
or "3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" not in rrset.name
):
continue
assert "1 1 1 - 3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" in str(rrset[0])
# check optout NSEC3 NODATA with only insecure delegatons
msg = isctest.query.create("single-nsec3", "A")
res = isctest.query.tcp(msg, "10.53.0.2")
isctest.check.noerror(res)
for rrset in res.authority:
if (
rrset.rdtype != rdatatype.NSEC3
or "3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" not in rrset.name
):
continue
assert "1 1 1 - 3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" in str(rrset[0])
# check negative unknown NSEC3-OPTOUT hash algorithm does not validate
msg = isctest.query.create("optout-unknown.example", "A", dnssec=False)
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.servfail(res2)
def test_cache(servers):
ns4 = servers["ns4"]
# check that key id's are logged when dumping the cache
ns4.rndc("dumpdb -cache", log=False)
assert grep_q("; key id = ", "ns4/named_dump.db")
# check for RRSIG covered type in negative cache
assert grep_q("; example. RRSIG NSEC ", "ns4/named_dump.db")
# check validated data are not cached longer than originalttl
msg = isctest.query.create("a.ttlpatch.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.rr_count_eq(res1.answer, 2)
isctest.check.rr_count_eq(res2.answer, 2)
for rrset in res1.answer:
assert rrset.ttl <= 3600
for rrset in res2.answer:
assert rrset.ttl <= 300
# query for a record, then follow it with a query for the
# corresponding RRSIG, check that it's answered from the cache
msg = isctest.query.create("normalthenrrsig.secure.example", "A")
isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("normalthenrrsig.secure.example", "RRSIG")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.raflag(res2)
# check direct query for RRSIG: if it's not cached with other records,
# it should result in an empty response.
msg = isctest.query.create("rrsigonly.secure.example", "RRSIG")
res1 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.empty_answer(res1)
isctest.check.noraflag(res1)
# check that a DNSKEY query with no data still gets cached
msg = isctest.query.create("insecure.example", "DNSKEY")
res1 = isctest.query.tcp(msg, "10.53.0.4")
time.sleep(1) # give the TTL time to change
res2 = isctest.query.tcp(msg, "10.53.0.4")
if res1.authority[0].ttl == res2.authority[0].ttl:
time.sleep(1)
res2 = isctest.query.tcp(msg, "10.53.0.4")
assert res1.authority[0].ttl != res2.authority[0].ttl
def test_insecure_proof_nsec(servers):
# 1-server positive
msg = isctest.query.create("a.insecure.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# 1-server negative
msg = isctest.query.create("q.insecure.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
# 1-server negative with SOA hack
msg = isctest.query.create("r.insecure.example", "SOA")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
assert res2.authority[0].rdtype == rdatatype.SOA
assert res2.authority[0].ttl == 0
# 2-server positive
msg = isctest.query.create("a.insecure.secure.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# 2-server negative
msg = isctest.query.create("q.insecure.secure.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
# 2-server negative with SOA hack
msg = isctest.query.create("r.insecure.secure.example", "SOA")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
# insecurity proof using negative cache
ns4 = servers["ns4"]
ns4.rndc("flush", log=False)
msg = isctest.query.create("insecure.example", "DS", cd=True)
isctest.query.tcp(msg, "10.53.0.4")
def query_and_check_nxdomain():
msg = isctest.query.create("nonexistent.insecure.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res)
return True
isctest.run.retry_with_timeout(query_and_check_nxdomain, 20)
# check insecure negative response with an unsigned NSEC
# first try the auth server...
msg = isctest.query.create("nsec-rrsigs-stripped", "TXT")
res1 = isctest.query.udp(msg, "10.53.0.10")
isctest.check.noerror(res1)
isctest.check.empty_answer(res1)
isctest.check.rr_count_eq(res1.authority, 2)
isctest.check.rr_count_eq(res1.additional, 0)
# make sure there's no RRSIG(NSEC)
for rrset in res1.authority:
assert rrset.rdtype != rdatatype.RRSIG or rrset.covers != rdatatype.NSEC
# now try the resolver
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noadflag(res2)
def test_insecure_proof_nsec3():
# 1-server
msg = isctest.query.create("a.insecure.nsec3.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# 1-server negative
msg = isctest.query.create("q.insecure.nsec3.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
# 1-server negative with SOA hack
msg = isctest.query.create("r.insecure.nsec3.example", "SOA")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
assert res2.authority[0].rdtype == rdatatype.SOA
assert res2.authority[0].ttl == 0
def test_insecure_proof_optout():
# 1-server
msg = isctest.query.create("a.insecure.optout.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# 1-server negative
msg = isctest.query.create("q.insecure.optout.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
# 1-server negative with SOA hack
msg = isctest.query.create("r.insecure.optout.example", "SOA")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.noadflag(res2)
assert res2.authority[0].rdtype == rdatatype.SOA
assert res2.authority[0].ttl == 0
def test_below_cname():
# check insecure zone below a cname resolves
msg = isctest.query.create("insecure.below-cname.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.noadflag(res)
isctest.check.rr_count_eq(res.answer, 1)
# check secure zone below a cname resolves and validates
msg = isctest.query.create("secure.below-cname.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
isctest.check.rr_count_eq(res.answer, 2)
@pytest.mark.parametrize(
"qname",
[
"a.secure.example", # NSEC/NSEC
"a.nsec3.example", # NSEC/NSEC3
"a.optout.example", # NSEC/OPTOUT
"a.secure.nsec3.example", # NSEC3/NSEC
"a.nsec3.nsec3.example", # NSEC3/NSEC3
"a.optout.nsec3.example", # NSEC3/OPTOUT
"a.secure.optout.example", # OPTOUT/NSEC
"a.nsec3.optout.example", # OPTOUT/NSEC3
"a.optout.optout.example", # OPTOUT/OPTOUT
],
)
def test_positive_validation_multistage(qname):
msg = isctest.query.create(qname, "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
def test_validation_recovery(servers):
ns2 = servers["ns2"]
ns4 = servers["ns4"]
# check recovery from spoofed server address.
# prime cache with spoofed address records...
msg = isctest.query.create("target.peer-ns-spoof", "A", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
ns4.rndc("dumpdb", log=False)
grep_q("10.53.0.100", "ns4/named_dump.db")
# then reload server with properly signed zone
shutil.copyfile(
"ns2/peer.peer-ns-spoof.db.next", "ns2/peer.peer-ns-spoof.db.signed"
)
with ns2.watch_log_from_here() as watcher:
ns2.rndc("reload peer.peer-ns-spoof", log=False)
watcher.wait_for_line("zone peer.peer-ns-spoof/IN: loaded serial 2000042408")
# and check we can resolve with the correct server address
msg = isctest.query.create("test.target.peer-ns-spoof", "TXT")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res)
isctest.check.adflag(res)
# check recovery from stripped DNSKEY RRSIG.
# prime cache with spoofed address records...
msg = isctest.query.create("dnskey-rrsigs-stripped", "DNSKEY", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.answer, 2)
# then reload server with properly signed zone
shutil.copyfile(
"ns2/dnskey-rrsigs-stripped.db.next", "ns2/dnskey-rrsigs-stripped.db.signed"
)
with ns2.watch_log_from_here() as watcher:
ns2.rndc("reload dnskey-rrsigs-stripped", log=False)
watcher.wait_for_line(
"zone dnskey-rrsigs-stripped/IN: loaded serial 2000042408"
)
# and check we can now resolve with the correct server address
msg = isctest.query.create("b.dnskey-rrsigs-stripped", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# check recovery from stripped DS RRSIG.
# prime cache with spoofed address records...
msg = isctest.query.create("child.ds-rrsigs-stripped", "DS", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.answer, 1)
# then reload server with properly signed zone
shutil.copyfile(
"ns2/ds-rrsigs-stripped.db.next", "ns2/ds-rrsigs-stripped.db.signed"
)
with ns2.watch_log_from_here() as watcher:
ns2.rndc("reload ds-rrsigs-stripped", log=False)
watcher.wait_for_line("zone ds-rrsigs-stripped/IN: loaded serial 2000042408")
# and check we can now resolve with the correct server address
msg = isctest.query.create("b.child.ds-rrsigs-stripped", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
# check recovery with mismatching NS
ns4.rndc("flush", log=False)
msg = isctest.query.create("inconsistent", "NS", dnssec=False, cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noadflag(res)
isctest.check.rr_count_eq(res.answer, 1)
isctest.check.rr_count_eq(res.additional, 1)
msg = isctest.query.create("inconsistent", "NS", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noadflag(res)
isctest.check.rr_count_eq(res.answer, 1)
isctest.check.rr_count_eq(res.additional, 1)
msg = isctest.query.create("inconsistent", "NS")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.adflag(res)
isctest.check.rr_count_eq(res.answer, 3)
isctest.check.rr_count_eq(res.additional, 0)
def test_failed_validation():
# bogus zone
msg = isctest.query.create("a.bogus.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
# missing key record
msg = isctest.query.create("a.b.keyless.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
def test_revoked_key():
# validation should succeed if a revoked key is encountered
msg = isctest.query.create("revkey.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
def test_standby_key():
# check that a secure chain with one active and one inactive KSK
# validates as secure
msg = isctest.query.create("a.lazy-ksk", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
def test_transitions():
# check that a zone finishing transitioning from one algorithm
# to another validates secure
msg = isctest.query.create("algroll", "NS")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
# check that validation yields insecure during transition to signed
msg = isctest.query.create("inprogress", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.noadflag(res)
a, _ = res.answer
assert str(a[0]) == "10.53.0.10"
def test_validating_forwarder(servers):
ns9 = servers["ns9"]
ns4 = servers["ns4"]
# check validating forwarder behavior with mismatching NS
ns4.rndc("flush", log=False)
msg = isctest.query.create("inconsistent", "NS", dnssec=False, cd=True)
res = isctest.query.tcp(msg, "10.53.0.9")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.answer, 1)
isctest.check.rr_count_eq(res.additional, 0)
isctest.check.noadflag(res)
msg = isctest.query.create("inconsistent", "NS", cd=True)
res = isctest.query.tcp(msg, "10.53.0.9")
isctest.check.rr_count_eq(res.additional, 0)
isctest.check.noadflag(res)
isctest.check.rr_count_eq(res.answer, 1)
isctest.check.rr_count_eq(res.authority, 0)
isctest.check.rr_count_eq(res.additional, 0)
msg.flags &= ~flags.CD
res = isctest.query.tcp(msg, "10.53.0.9")
isctest.check.rr_count_eq(res.answer, 3)
isctest.check.rr_count_eq(res.authority, 0)
isctest.check.rr_count_eq(res.additional, 0)
isctest.check.adflag(res)
# check validating forwarder sends CD to validate with a local trust anchor
ns4.rndc("flush", log=False)
msg = isctest.query.create("localkey.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
with ns9.watch_log_from_here() as watcher:
res = isctest.query.tcp(msg, "10.53.0.9")
isctest.check.noerror(res)
isctest.check.adflag(res)
watcher.wait_for_line("status: SERVFAIL")
def test_expired_signatures(servers):
# check expired signatures do not validate
msg = isctest.query.create("expired.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.3")
rrsig = res.get_rrset(
res.answer,
name.from_text("expired.example."),
rdataclass.IN,
rdatatype.RRSIG,
rdatatype.SOA,
)
assert rrsig, "expected RRSIG(SOA) missing from AUTHORITY: " + str(rrsig)
isctest.check.rr_count_eq(res.answer, 2)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
isctest.check.noadflag(res)
if hasattr(res, "extended_errors"):
assert res.extended_errors()[0].code == edns.EDECode.SIGNATURE_EXPIRED
assert grep_q("expired.example/.*: RRSIG has expired", "ns4/named.run")
# check future signatures do not validate
msg = isctest.query.create("future.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
isctest.check.noadflag(res)
if hasattr(res, "extended_errors"):
assert res.extended_errors()[0].code == edns.EDECode.SIGNATURE_NOT_YET_VALID
assert grep_q(
"future.example/.*: RRSIG validity period has not begun", "ns4/named.run"
)
# check that a dynamic zone with future signatures is re-signed on load
msg = isctest.query.create("managed-future.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.adflag(res)
isctest.check.noerror(res)
ns4 = servers["ns4"]
# test TTL is capped at RRSIG expiry time
ns4.rndc("flush", log=False)
msg = isctest.query.create("expiring.example", "SOA", cd=True)
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "SOA")
res2 = isctest.query.tcp(msg, "10.53.0.4")
for rrset in res1.answer:
assert rrset.ttl <= 3600
for rrset in res2.answer:
assert rrset.ttl <= 60
# test TTL is capped at RRSIG expiry time in the additional section (NS)
ns4.rndc("flush", log=False)
msg = isctest.query.create("expiring.example", "NS", cd=True)
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "NS")
res2 = isctest.query.tcp(msg, "10.53.0.4")
for rrset in res1.additional:
assert rrset.ttl <= 3600
for rrset in res2.additional:
assert rrset.ttl <= 60
# test TTL is capped at RRSIG expiry time in the additional section (MX)
ns4.rndc("flush", log=False)
msg = isctest.query.create("expiring.example", "MX", cd=True)
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "MX")
res2 = isctest.query.tcp(msg, "10.53.0.4")
for rrset in res1.additional:
assert rrset.ttl <= 3600
for rrset in res2.additional:
assert rrset.ttl <= 60
def test_accept_expired(servers, templates):
ns4 = servers["ns4"]
templates.render("ns4/named.conf", {"accept_expired": True})
ns4.reconfigure(log=False)
# test TTL of about-to-expire rrsets with accept-expired
ns4.rndc("flush", log=False)
msg = isctest.query.create("expiring.example", "SOA")
msg.flags |= flags.CD
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "SOA")
res2 = isctest.query.tcp(msg, "10.53.0.4")
for rrset in res1.answer:
assert rrset.ttl <= 3600
for rrset in res2.answer:
assert rrset.ttl <= 120
# test TTL is capped at RRSIG expiry time in the additional section
# with accept-expired
ns4.rndc("flush", log=False)
msg = isctest.query.create("expiring.example", "MX")
msg.flags |= flags.CD
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "MX")
res2 = isctest.query.tcp(msg, "10.53.0.4")
for rrset in res1.additional:
assert rrset.ttl <= 3600
for rrset in res2.additional:
assert rrset.ttl <= 120
# test TTL of expired rrsets with accept-expired
ns4.rndc("flush", log=False)
msg = isctest.query.create("expired.example", "SOA")
msg.flags |= flags.CD
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expired.example", "SOA")
res2 = isctest.query.tcp(msg, "10.53.0.4")
for rrset in res1.additional:
assert rrset.ttl <= 3600
for rrset in res2.additional:
assert rrset.ttl <= 120
def test_casing():
# test legacy upper-case signer name validation
msg = isctest.query.create("upper.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.adflag(res)
_, sig = res.answer
assert sig.rdtype == rdatatype.RRSIG
assert sig.covers == rdatatype.SOA
assert "UPPER.EXAMPLE." in str(sig[0])
# test that we lower-case signer name
msg = isctest.query.create("LOWER.EXAMPLE", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
_, sig = res.answer
assert sig.rdtype == rdatatype.RRSIG
assert sig.covers == rdatatype.SOA
assert "lower.example." in str(sig[0])
def test_broken_servers():
# check that a non-cacheable NODATA works
msg = isctest.query.create("a.nosoa.secure.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.6")
isctest.check.rr_count_eq(res1.authority, 0)
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res2)
# check that a non-cacheable NXDOMAIN works
msg = isctest.query.create("b.nosoa.secure.example", "TXT")
res1 = isctest.query.tcp(msg, "10.53.0.6")
isctest.check.rr_count_eq(res1.authority, 0)
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.nxdomain(res2)
# check that split RRSIGs are handled
msg = isctest.query.create("split-rrsig", "SOA")
res = isctest.query.tcp(msg, "10.53.0.6")
soa, _ = res.answer
assert soa[0].serial > 1
# check that not-at-zone-apex RRSIG(SOA) rrsets are removed
msg = isctest.query.create("split-rrsig", "AXFR")
res = isctest.query.tcp(msg, "10.53.0.6")
nza = [
r
for r in res.answer
if str(r.name) == "not-at-zone-apex.split-rrsig."
and r.rdtype == rdatatype.RRSIG
and r.covers == rdatatype.SOA
]
assert not nza
# check validation with missing nearest encloser proof
msg = isctest.query.create("b.c.d.optout-tld", "DS")
res = isctest.query.tcp(msg, "10.53.0.6")
nsec3s = [a for a in res.authority if a.rdtype == rdatatype.NSEC3]
assert len(nsec3s) == 2
msg = isctest.query.create("b.c.d.optout-tld", "A")
res = isctest.query.tcp(msg, "10.53.0.6")
nsec3s = [a for a in res.authority if a.rdtype == rdatatype.NSEC3]
assert len(nsec3s) == 1
res = isctest.query.tcp(msg, "10.53.0.6")
isctest.check.noerror(res)
isctest.check.noadflag(res)
msg = isctest.query.create("optout-tld", "SOA")
res = isctest.query.tcp(msg, "10.53.0.6")
isctest.check.noadflag(res)
def test_pending_ds():
# check that a query against a validating resolver succeeds when there is
# a negative cache entry with trust level "pending" for the DS. prime
# with a +cd DS query to produce the negative cache entry, then send a
# query that uses that entry as part of the validation process.
ns4.rndc("flush", log=False)
msg = isctest.query.create("insecure.example", "DS", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.authority, 4)
msg = isctest.query.create("a.insecure.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.answer, 1)
isctest.check.rr_count_eq(res.authority, 1)
isctest.check.noadflag(res)
def test_trust_anchors(servers, templates):
# DNSSEC tests related to unsupported, disabled and revoked trust anchors.
#
# This nameserver is loaded with a bunch of trust anchors.
# Some of them are good (enabled.managed, enabled.trusted,
# secure.managed, secure.trusted), and some of them are bad
# (disabled.managed, revoked.managed, unsupported.managed,
# disabled.trusted, revoked.trusted, unsupported.trusted). Make sure
# that the bad trust anchors are ignored. This is tested by looking
# for the corresponding lines in the logfile.
ns5 = servers["ns5"]
templates.render("ns5/named.conf", {"many_anchors": True})
ns5.reconfigure(log=False)
# check that keys with unsupported/disabled algorithms are ignored
grep_q(
"ignoring static-key for 'disabled.trusted.': algorithm is disabled",
"ns5/named.run",
)
grep_q(
"ignoring static-key for 'disabled.managed.': algorithm is disabled",
"ns5/named.run",
)
grep_q(
"ignoring static-key for 'unsupported.trusted.': algorithm is unsupported",
"ns5/named.run",
)
grep_q(
"ignoring static-key for 'unsupported.managed.': algorithm is unsupported",
"ns5/named.run",
)
grep_q("ignoring static-key for 'revoked.trusted.': bad key type", "ns5/named.run")
grep_q("ignoring static-key for 'revoked.managed.': bad key type", "ns5/named.run")
# check that a key with supported algorithm validates as secure
msg = isctest.query.create("a.secure.trusted", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
if hasattr(res2, "extended_errors"):
assert not res2.extended_errors()
msg = isctest.query.create("a.secure.managed", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
if hasattr(res2, "extended_errors"):
assert not res2.extended_errors()
# check that an unsupported signing algorithm yields insecure
msg = isctest.query.create("a.unsupported.trusted", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
if hasattr(res2, "extended_errors"):
assert (
res2.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
msg = isctest.query.create("a.unsupported.managed", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
if hasattr(res2, "extended_errors"):
assert (
res2.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# check that a disabled signing algorithm yields insecure
msg = isctest.query.create("a.disabled.trusted", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
msg = isctest.query.create("a.disabled.managed", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# check that zone signed with an algorithm that's disabled for
# some other domain, but not for this one, validates as secure.
# "enabled.trusted." and "enabled.managed." do not match the
# "disable-algorithms" option, so no special rules apply. (static)
msg = isctest.query.create("a.enabled.trusted", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
msg = isctest.query.create("a.enabled.managed", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
# a revoked trust anchor is ignored when configured; check that
# this yields insecure.
msg = isctest.query.create("a.revoked.trusted", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
msg = isctest.query.create("a.revoked.managed", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.5")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
def test_unknown_algorithms():
# check that unknown DNSKEY algorithm validates as insecure
msg = isctest.query.create("dnskey-unknown.example", "A", dnssec=False)
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
# check that unsupported DNSKEY algorithms are in the DNSKEY RRsets
msg = isctest.query.create("dnskey-unsupported.example", "DNSKEY")
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res)
msg = isctest.query.create("dnskey-unsupported-2.example", "DNSKEY")
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res)
rrsets = [str(r) for r in res.answer]
assert any("257 3 255" in r for r in rrsets)
# check that unsupported DNSKEY algorithm validates as insecure
msg = isctest.query.create("dnskey-unsupported.example", "DNSKEY")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.noadflag(res)
if hasattr(res, "extended_errors"):
assert (
res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
)
# check that DNSKEY with an unsupported reserve key validates
msg = isctest.query.create("dnskey-unsupported-2.example", "DNSKEY")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
# check EDE code 2 for unsupported DS digest algorithm
msg = isctest.query.create("a.ds-unsupported.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
if hasattr(res, "extended_errors"):
assert res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DS_DIGEST_TYPE
# check EDE code 1 for bad algorithm mnemonic
msg = isctest.query.create("badalg.secure.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noadflag(res)
if hasattr(res, "extended_errors"):
assert (
res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
)
# check both EDE code 1 and 2 for unsupported digest on one DNSKEY
# and unsupported algorithm on the other
msg = isctest.query.create("a.digest-alg-unsupported.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noadflag(res)
if hasattr(res, "extended_errors"):
codes = {ede.code for ede in res.extended_errors()}
assert edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM in codes
assert edns.EDECode.UNSUPPORTED_DS_DIGEST_TYPE in codes
# check that unknown DNSKEY algorithm + unknown NSEC3 hash algorithm
# validates as insecure
msg = isctest.query.create("dnskey-nsec3-unknown.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res1)
isctest.check.noerror(res2)
isctest.check.noadflag(res2)
###################################
##### BEGIN MANAGED KEY TESTS #####
###################################
def test_switch_managed(servers, templates):
# switch to intializing trust anchor instead of static
ns4 = servers["ns4"]
assert os.path.exists("ns4/managed-keys.bind.jnl") is False
shutil.copyfile("ns4/managed-keys.bind.in", "ns4/managed-keys.bind")
templates.render("ns4/named.conf", {"managed_key": True})
ns4.reconfigure(log=False)
def test_secure_root_managed(servers):
# check that a query for a secure root validates
msg = isctest.query.create(".", "KEY")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
# check that "rndc secroots" dumps the trusted keys
ns4 = servers["ns4"]
key = int(getfrom("ns1/managed.key.id"))
alg = os.environ["DEFAULT_ALGORITHM"]
expected = f"./{alg}/{key} ; managed"
response = ns4.rndc("secroots -", log=False).splitlines()
assert expected in response
assert len(response) == 10
def test_positive_validation_nsec_managed():
msg = isctest.query.create("a.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
def test_positive_validation_nsec3_managed():
msg = isctest.query.create("a.nsec3.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.noerror(res2)
isctest.check.adflag(res2)
def test_positive_validation_optout_managed():
msg = isctest.query.create("a.optout.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.3")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.adflag(res2)
def test_negative_validation_nsec_managed():
# nxdomain
msg = isctest.query.create("q.example", "A")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_answer(res1, res2)
isctest.check.nxdomain(res2)
isctest.check.adflag(res2)
def test_ds_managed():
# check root DS queries validate
msg = isctest.query.create(".", "DS")
res1 = isctest.query.tcp(msg, "10.53.0.1")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.adflag(res2)
isctest.check.noerror(res2)
# check DS queries succeed at RFC 1918 empty zone
msg = isctest.query.create("10.in-addr.arpa", "DS")
res1 = isctest.query.tcp(msg, "10.53.0.2")
res2 = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.same_data(res1, res2)
isctest.check.noerror(res2)
def test_keydata_storage(servers):
ns4 = servers["ns4"]
ns4.rndc("managed-keys sync", log=False)
with isctest.log.WatchLogFromStart("ns4/managed-keys.bind") as watcher:
watcher.wait_for_line(["KEYDATA", "next refresh:"])
############################################
##### BEGIN MULTIVIEW VALIDATION TESTS #####
############################################
def test_insecure_staticstub_delegation(servers, templates):
ns4 = servers["ns4"]
templates.render("ns4/named.conf", {"multi_view": True})
ns4.reconfigure(log=False)
# check insecure delegation between static-stub zones
msg = isctest.query.create("insecure.secure.example", "NS")
for _ in range(5):
res = isctest.query.tcp(msg, "10.53.0.4")
if res.rcode() == rcode.SERVFAIL:
time.sleep(1)
continue
isctest.check.noerror(res)
msg = isctest.query.create("secure.example", "NS")
for _ in range(5):
res = isctest.query.tcp(msg, "10.53.0.4")
if res.rcode() == rcode.SERVFAIL:
time.sleep(1)
continue
isctest.check.noerror(res)