2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-09-01 15:05:23 +00:00

Don't depend on keys being sorted

Extract each section of the bundle and check that the expected
records are there.  The old code was assuming that the records in
each section where in a particular order which didn't happen in
practice.
This commit is contained in:
Mark Andrews
2025-04-29 16:59:15 +10:00
parent fad97e3cd1
commit 92a50dab28

View File

@@ -212,6 +212,54 @@ def check_keys(
num += 1 num += 1
def check_key_bundle(bundle_keys, bundle_lines, cdnskey=False):
count = 0
for key in bundle_keys:
found = False
for line in bundle_lines:
if key.dnskey_equals(line, cdnskey):
found = True
count += 1
assert found
assert count == len(bundle_keys)
assert count == len(bundle_lines)
def check_cds_bundle(bundle_keys, bundle_lines, expected_cds):
count = 0
for key in bundle_keys:
found = False
# the cds of this ksk must be in the ksr
for line in bundle_lines:
for alg in expected_cds:
if key.cds_equals(line, alg.strip()):
found = True
count += 1
assert found
assert count == len(expected_cds) * len(bundle_keys)
assert count == len(bundle_lines)
def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart):
count = 0
for key in bundle_keys:
found = False
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG {rrtype} {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
# there must be a signature of this ksk
for line in bundle_lines:
rrsig = " ".join(line.split())
if expect in rrsig:
found = True
count += 1
assert found
assert count == len(bundle_keys)
assert count == len(bundle_lines)
def check_keysigningrequest(out, zsks, start, end): def check_keysigningrequest(out, zsks, start, end):
lines = out.split("\n") lines = out.split("\n")
line_no = 0 line_no = 0
@@ -222,8 +270,10 @@ def check_keysigningrequest(out, zsks, start, end):
# expect bundle header # expect bundle header
assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no] assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no]
line_no += 1 line_no += 1
bundle_keys = []
bundle_lines = []
# expect zsks # expect zsks
for key in sorted(zsks): for key in zsks:
published = key.get_timing("Publish") published = key.get_timing("Publish")
if between(published, inception, next_bundle): if between(published, inception, next_bundle):
next_bundle = published next_bundle = published
@@ -237,10 +287,14 @@ def check_keysigningrequest(out, zsks, start, end):
if removed is not None and inception >= removed: if removed is not None and inception >= removed:
continue continue
# this zsk must be in the ksr # collect keys that should be in this bundle
assert key.dnskey_equals(lines[line_no]) # collect lines that should be in this bundle
bundle_keys.append(key)
bundle_lines.append(lines[line_no])
line_no += 1 line_no += 1
check_key_bundle(bundle_keys, bundle_lines)
inception = next_bundle inception = next_bundle
# ksr footer # ksr footer
@@ -298,7 +352,9 @@ def check_signedkeyresponse(
line_no += 1 line_no += 1
# expect ksks # expect ksks
for key in sorted(ksks): bundle_keys = []
bundle_lines = []
for key in ksks:
published = key.get_timing("Publish") published = key.get_timing("Publish")
if between(published, inception, next_bundle): if between(published, inception, next_bundle):
next_bundle = published next_bundle = published
@@ -313,12 +369,18 @@ def check_signedkeyresponse(
if between(removed, inception, next_bundle): if between(removed, inception, next_bundle):
next_bundle = removed next_bundle = removed
# this ksk must be in the ksr # collect keys that should be in this bundle
assert key.dnskey_equals(lines[line_no]) # collect lines that should be in this bundle
bundle_keys.append(key)
bundle_lines.append(lines[line_no])
line_no += 1 line_no += 1
check_key_bundle(bundle_keys, bundle_lines)
# expect zsks # expect zsks
for key in sorted(zsks): bundle_keys = []
bundle_lines = []
for key in zsks:
published = key.get_timing("Publish") published = key.get_timing("Publish")
if between(published, inception, next_bundle): if between(published, inception, next_bundle):
next_bundle = published next_bundle = published
@@ -332,12 +394,18 @@ def check_signedkeyresponse(
if removed is not None and inception >= removed: if removed is not None and inception >= removed:
continue continue
# this zsk must be in the ksr # collect keys that should be in this bundle
assert key.dnskey_equals(lines[line_no]) # collect lines that should be in this bundle
bundle_keys.append(key)
bundle_lines.append(lines[line_no])
line_no += 1 line_no += 1
check_key_bundle(bundle_keys, bundle_lines)
# expect rrsig(dnskey) # expect rrsig(dnskey)
for key in sorted(ksks): bundle_keys = []
bundle_lines = []
for key in ksks:
active = key.get_timing("Activate") active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False) inactive = key.get_timing("Inactive", must_exist=False)
if active > inception: if active > inception:
@@ -345,17 +413,20 @@ def check_signedkeyresponse(
if inactive is not None and inception >= inactive: if inactive is not None and inception >= inactive:
continue continue
# there must be a signature of this ksk # collect keys that should be in this bundle
alg = key.get_metadata("Algorithm") # collect lines that should be in this bundle
expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}." bundle_keys.append(key)
rrsig = " ".join(lines[line_no].split()) bundle_lines.append(lines[line_no])
assert expect in rrsig
line_no += 1 line_no += 1
check_rrsig_bundle(bundle_keys, bundle_lines, zone, "DNSKEY", sigend, sigstart)
# expect cdnskey # expect cdnskey
have_cdnskey = False have_cdnskey = False
if cdnskey: if cdnskey:
for key in sorted(ksks): bundle_keys = []
bundle_lines = []
for key in ksks:
published = key.get_timing("SyncPublish") published = key.get_timing("SyncPublish")
if between(published, inception, next_bundle): if between(published, inception, next_bundle):
next_bundle = published next_bundle = published
@@ -369,14 +440,20 @@ def check_signedkeyresponse(
if removed is not None and inception >= removed: if removed is not None and inception >= removed:
continue continue
# the cdnskey of this ksk must be in the ksr # collect keys that should be in this bundle
assert key.dnskey_equals(lines[line_no], cdnskey=True) # collect lines that should be in this bundle
bundle_keys.append(key)
bundle_lines.append(lines[line_no])
line_no += 1 line_no += 1
have_cdnskey = True have_cdnskey = True
check_key_bundle(bundle_keys, bundle_lines, cdnskey=True)
if have_cdnskey: if have_cdnskey:
# expect rrsig(cdnskey) # expect rrsig(cdnskey)
for key in sorted(ksks): bundle_keys = []
bundle_lines = []
for key in ksks:
active = key.get_timing("Activate") active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False) inactive = key.get_timing("Inactive", must_exist=False)
if active > inception: if active > inception:
@@ -384,17 +461,23 @@ def check_signedkeyresponse(
if inactive is not None and inception >= inactive: if inactive is not None and inception >= inactive:
continue continue
# there must be a signature of this ksk # collect keys that should be in this bundle
alg = key.get_metadata("Algorithm") # collect lines that should be in this bundle
expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}." bundle_keys.append(key)
rrsig = " ".join(lines[line_no].split()) bundle_lines.append(lines[line_no])
assert expect in rrsig
line_no += 1 line_no += 1
check_rrsig_bundle(
bundle_keys, bundle_lines, zone, "CDNSKEY", sigend, sigstart
)
# expect cds # expect cds
have_cds = False have_cds = False
if cds != "": if cds != "":
for key in sorted(ksks): bundle_keys = []
bundle_lines = []
expected_cds = cds.split(",")
for key in ksks:
published = key.get_timing("SyncPublish") published = key.get_timing("SyncPublish")
if between(published, inception, next_bundle): if between(published, inception, next_bundle):
next_bundle = published next_bundle = published
@@ -408,16 +491,22 @@ def check_signedkeyresponse(
if removed is not None and inception >= removed: if removed is not None and inception >= removed:
continue continue
# the cds of this ksk must be in the ksr # collect keys that should be in this bundle
expected_cds = cds.split(",") # collect lines that should be in this bundle
for alg in expected_cds: bundle_keys.append(key)
assert key.cds_equals(lines[line_no], alg.strip()) # pylint: disable=unused-variable
for _arg in expected_cds:
bundle_lines.append(lines[line_no])
line_no += 1 line_no += 1
have_cds = True have_cds = True
check_cds_bundle(bundle_keys, bundle_lines, expected_cds)
if have_cds: if have_cds:
# expect rrsig(cds) # expect rrsig(cds)
for key in sorted(ksks): bundle_keys = []
bundle_lines = []
for key in ksks:
active = key.get_timing("Activate") active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False) inactive = key.get_timing("Inactive", must_exist=False)
if active > inception: if active > inception:
@@ -425,13 +514,14 @@ def check_signedkeyresponse(
if inactive is not None and inception >= inactive: if inactive is not None and inception >= inactive:
continue continue
# there must be a signature of this ksk # collect keys that should be in this bundle
alg = key.get_metadata("Algorithm") # collect lines that should be in this bundle
expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}." bundle_keys.append(key)
rrsig = " ".join(lines[line_no].split()) bundle_lines.append(lines[line_no])
assert expect in rrsig
line_no += 1 line_no += 1
check_rrsig_bundle(bundle_keys, bundle_lines, zone, "CDS", sigend, sigstart)
inception = next_bundle inception = next_bundle
# skr footer # skr footer