diff --git a/bin/tests/system/isctest/instance.py b/bin/tests/system/isctest/instance.py index 5725b3e132..adbe2f8fb4 100644 --- a/bin/tests/system/isctest/instance.py +++ b/bin/tests/system/isctest/instance.py @@ -18,9 +18,13 @@ import os from pathlib import Path import re +import dns.message +import dns.rcode + +from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor from .run import perl -from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere +from .query import udp class NamedPorts(NamedTuple): @@ -157,6 +161,28 @@ class NamedInstance: return response + def nsupdate(self, update_msg: dns.message.Message): + """ + Issue a dynamic update to a server's zone. + """ + # FUTURE update_msg is actually dns.update.UpdateMessage, but it not + # typed properly here in order to support use of this module with + # dnspython<2.0.0 + zone = str(update_msg.zone[0].name) # type: ignore[attr-defined] + try: + response = udp( + update_msg, + self.ip, + self.ports.dns, + timeout=3, + expected_rcode=dns.rcode.NOERROR, + ) + except dns.exception.Timeout as exc: + msg = f"update timeout for {zone}" + raise dns.exception.Timeout(msg) from exc + debug(f"update of zone {zone} to server {self.ip} successful") + return response + def watch_log_from_start(self) -> WatchLogFromStart: """ Return an instance of the `WatchLogFromStart` context manager for this diff --git a/bin/tests/system/kasp/tests_kasp.py b/bin/tests/system/kasp/tests_kasp.py index 6cbb87080b..542f7f496c 100644 --- a/bin/tests/system/kasp/tests_kasp.py +++ b/bin/tests/system/kasp/tests_kasp.py @@ -938,26 +938,6 @@ def test_kasp_dynamic(servers): isctest.kasp.check_keytimes(keys, expected) check_all(server, zone, policy, keys, []) - # Update zone with nsupdate. - def nsupdate(updates): - message = dns.update.UpdateMessage(zone) - for update in updates: - if update[0] == "del": - message.delete(update[1], update[2], update[3]) - else: - assert update[0] == "add" - message.add(update[1], update[2], update[3], update[4]) - - try: - response = isctest.query.udp( - message, server.ip, server.ports.dns, timeout=3 - ) - assert response.rcode() == dns.rcode.NOERROR - except dns.exception.Timeout: - assert False, f"update timeout for {zone}" - - isctest.log.debug(f"update of zone {zone} to server {server.ip} successful") - def update_is_signed(): parts = update.split() qname = parts[0] @@ -967,24 +947,22 @@ def test_kasp_dynamic(servers): server, zone, qname, qtype, rdata, keys, [] ) - updates = [ - ["del", f"a.{zone}.", "A", "10.0.0.1"], - ["add", f"a.{zone}.", 300, "A", "10.0.0.101"], - ["add", f"d.{zone}.", 300, "A", "10.0.0.4"], - ] - nsupdate(updates) + update_msg = dns.update.UpdateMessage(zone) + update_msg.delete(f"a.{zone}.", "A", "10.0.0.1") + update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.101") + update_msg.add(f"d.{zone}.", 300, "A", "10.0.0.4") + server.nsupdate(update_msg) expected_updates = [f"a.{zone}. A 10.0.0.101", f"d.{zone}. A 10.0.0.4"] for update in expected_updates: isctest.run.retry_with_timeout(update_is_signed, timeout=5) - # Update zone with nsupdate (reverting the above change). - updates = [ - ["add", f"a.{zone}.", 300, "A", "10.0.0.1"], - ["del", f"a.{zone}.", "A", "10.0.0.101"], - ["del", f"d.{zone}.", "A", "10.0.0.4"], - ] - nsupdate(updates) + # Update zone (reverting the above change). + update_msg = dns.update.UpdateMessage(zone) + update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.1") + update_msg.delete(f"a.{zone}.", "A", "10.0.0.101") + update_msg.delete(f"d.{zone}.", "A", "10.0.0.4") + server.nsupdate(update_msg) update = f"a.{zone}. A 10.0.0.1" isctest.run.retry_with_timeout(update_is_signed, timeout=5) diff --git a/bin/tests/system/rollover/tests_rollover.py b/bin/tests/system/rollover/tests_rollover.py index 0eec2932bd..a3bb20650e 100644 --- a/bin/tests/system/rollover/tests_rollover.py +++ b/bin/tests/system/rollover/tests_rollover.py @@ -263,22 +263,6 @@ def test_rollover_multisigner(servers): return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8") - def nsupdate(updates): - message = dns.update.UpdateMessage(zone) - for update in updates: - if update[0] == 0: - message.delete(update[1], update[2], update[3]) - else: - message.add(update[1], update[2], update[3], update[4]) - - try: - response = isctest.query.udp( - message, server.ip, server.ports.dns, timeout=3 - ) - assert response.rcode() == dns.rcode.NOERROR - except dns.exception.Timeout: - isctest.log.info(f"error: update timeout for {zone}") - zone = "multisigner-model2.kasp" isctest.kasp.check_dnssec_verify(server, zone) @@ -322,8 +306,9 @@ def test_rollover_multisigner(servers): dnskey = newkeys[0].dnskey().split() rdata = " ".join(dnskey[4:]) - updates = [[1, f"{dnskey[0]}", 3600, "DNSKEY", rdata]] - nsupdate(updates) + update_msg = dns.update.UpdateMessage(zone) + update_msg.add(f"{dnskey[0]}", 3600, "DNSKEY", rdata) + server.nsupdate(update_msg) isctest.kasp.check_dnssec_verify(server, zone) @@ -336,11 +321,10 @@ def test_rollover_multisigner(servers): # Remove ZSKs from the other providers for zone. dnskey2 = extkeys[0].dnskey().split() rdata2 = " ".join(dnskey2[4:]) - updates = [ - [0, f"{dnskey[0]}", "DNSKEY", rdata], - [0, f"{dnskey2[0]}", "DNSKEY", rdata2], - ] - nsupdate(updates) + update_msg = dns.update.UpdateMessage(zone) + update_msg.delete(f"{dnskey[0]}", "DNSKEY", rdata) + update_msg.delete(f"{dnskey2[0]}", "DNSKEY", rdata2) + server.nsupdate(update_msg) isctest.kasp.check_dnssec_verify(server, zone)