2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 14:35:26 +00:00

chg: test: Improve WatchLog API for pytest

- Refactor and extend the `WatchLog.wait_for_line()` API:
    1. To allow for usage of one or more FlexPatterns, i.e. either plain
       strings to be matched verbatim, or regular expressions. Both can be
       used interchangeably to provide the caller to write simple and
       readable test code, while allowing for increased complexity to allow
       special cases.
    2. Always return the regex match, which allows the caller to identify
       which line was matched, as well as to extract any additional
       information, such as individual regex groups.
- Add `WatchLog.wait_for_sequence()` and `WatchLog.wait_for_all()` helper functions

Merge branch 'nicki/watchlog-improvements' into 'main'

See merge request isc-projects/bind9!10618
This commit is contained in:
Nicki Křížek
2025-07-18 12:13:12 +02:00
5 changed files with 409 additions and 168 deletions

View File

@@ -599,6 +599,17 @@ coccinelle:
- util/check-cocci
- if test "$(git status --porcelain | grep -Ev '\?\?' | wc -l)" -gt "0"; then git status --short; exit 1; fi
doctest:
<<: *precheck_job
needs: []
script:
- *configure
- meson compile -C build system-test-init
- *find_pytest
- cd bin/tests/system/isctest
- >
"$PYTEST" --noconftest --doctest-modules
pylint:
<<: *default_triggering_rules
<<: *debian_sid_amd64_image

View File

@@ -118,35 +118,28 @@ class NamedInstance:
The RNDC command will be logged to `rndc.log` (along with the server's
response) unless `log` is set to `False`.
>>> # Instances of the `NamedInstance` class are expected to be passed
>>> # to pytest tests as fixtures; here, some instances are created
>>> # directly (with a fake RNDC executor) so that doctest can work.
>>> import unittest.mock
>>> mock_rndc_executor = unittest.mock.Mock()
>>> ns1 = NamedInstance("ns1", rndc_executor=mock_rndc_executor)
>>> ns2 = NamedInstance("ns2", rndc_executor=mock_rndc_executor)
>>> ns3 = NamedInstance("ns3", rndc_executor=mock_rndc_executor)
>>> ns4 = NamedInstance("ns4", rndc_executor=mock_rndc_executor)
```python
def test_foo(servers):
# Send the "status" command to ns1. An `RNDCException` will be
# raised if the RNDC command fails. This command will be logged.
response = servers["ns1"].rndc("status")
>>> # Send the "status" command to ns1. An `RNDCException` will be
>>> # raised if the RNDC command fails. This command will be logged.
>>> response = ns1.rndc("status")
# Send the "thaw foo" command to ns2. No exception will be raised
# in case the RNDC command fails. This command will be logged
# (even if it fails).
response = servers["ns2"].rndc("thaw foo", ignore_errors=True)
>>> # Send the "thaw foo" command to ns2. No exception will be raised
>>> # in case the RNDC command fails. This command will be logged
>>> # (even if it fails).
>>> response = ns2.rndc("thaw foo", ignore_errors=True)
# Send the "stop" command to ns3. An `RNDCException` will be
# raised if the RNDC command fails, but this command will not be
# logged (the server's response will still be returned to the
# caller, though).
response = servers["ns3"].rndc("stop", log=False)
>>> # Send the "stop" command to ns3. An `RNDCException` will be
>>> # raised if the RNDC command fails, but this command will not be
>>> # logged (the server's response will still be returned to the
>>> # caller, though).
>>> response = ns3.rndc("stop", log=False)
>>> # Send the "halt" command to ns4 in "fire & forget mode": no
>>> # exceptions will be raised and no logging will take place (the
>>> # server's response will still be returned to the caller, though).
>>> response = ns4.rndc("stop", ignore_errors=True, log=False)
# Send the "halt" command to ns4 in "fire & forget mode": no
# exceptions will be raised and no logging will take place (the
# server's response will still be returned to the caller, though).
response = servers["ns4"].rndc("stop", ignore_errors=True, log=False)
```
"""
try:
response = self._rndc_executor.call(self.ip, self.ports.rndc, command)
@@ -183,19 +176,23 @@ class NamedInstance:
debug(f"update of zone {zone} to server {self.ip} successful")
return response
def watch_log_from_start(self) -> WatchLogFromStart:
def watch_log_from_start(
self, timeout: float = WatchLogFromStart.DEFAULT_TIMEOUT
) -> WatchLogFromStart:
"""
Return an instance of the `WatchLogFromStart` context manager for this
`named` instance's log file.
"""
return WatchLogFromStart(self.log.path)
return WatchLogFromStart(self.log.path, timeout)
def watch_log_from_here(self) -> WatchLogFromHere:
def watch_log_from_here(
self, timeout: float = WatchLogFromHere.DEFAULT_TIMEOUT
) -> WatchLogFromHere:
"""
Return an instance of the `WatchLogFromHere` context manager for this
`named` instance's log file.
"""
return WatchLogFromHere(self.log.path)
return WatchLogFromHere(self.log.path, timeout)
def reconfigure(self) -> None:
"""

View File

@@ -9,17 +9,27 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Iterator, Optional, TextIO, Dict, Any, Union, Pattern
from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVar, Union
import abc
import os
import re
import time
FlexPattern = Union[str, Pattern]
T = TypeVar("T")
OneOrMore = Union[T, List[T]]
class WatchLogException(Exception):
pass
class WatchLogTimeout(WatchLogException):
pass
class LogFile:
"""
Log file wrapper with a path and means to find a string in its contents.
@@ -54,6 +64,94 @@ class LogFile:
assert False, f"forbidden message appeared in log {self.path}: {msg}"
class LineReader:
"""
>>> import io
>>> file = io.StringIO("complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then incomplete line")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then another complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
and then another complete line
>>> file = io.StringIO()
>>> line_reader = LineReader(file)
>>> for chunk in (
... "first line\\nsecond line\\nthi",
... "rd ",
... "line\\nfour",
... "th line\\n\\nfifth line\\n"
... ):
... print("=== OUTER ITERATION ===")
... pos = file.tell()
... print(chunk, end="", file=file)
... _ = file.seek(pos)
... for line in line_reader.readlines():
... print("--- inner iteration ---")
... print(line.strip() or "<blank>")
=== OUTER ITERATION ===
--- inner iteration ---
first line
--- inner iteration ---
second line
=== OUTER ITERATION ===
=== OUTER ITERATION ===
--- inner iteration ---
third line
=== OUTER ITERATION ===
--- inner iteration ---
fourth line
--- inner iteration ---
<blank>
--- inner iteration ---
fifth line
"""
def __init__(self, stream: TextIO):
self._stream = stream
self._linebuf = ""
def readline(self) -> Optional[str]:
"""
Wrapper around io.readline() function to handle unfinished lines.
If a line ends with newline character, it's returned immediately.
If a line doesn't end with a newline character, the read contents are
buffered until the next call of this function and None is returned
instead.
"""
read = self._stream.readline()
if not read.endswith("\n"):
self._linebuf += read
return None
read = self._linebuf + read
self._linebuf = ""
return read
def readlines(self) -> Iterator[str]:
"""
Wrapper around io.readline() which only returns finished lines.
"""
while True:
line = self.readline()
if line is None:
return
yield line
class WatchLog(abc.ABC):
"""
Wait for a log message to appear in a text file.
@@ -65,44 +163,95 @@ class WatchLog(abc.ABC):
by the `NamedInstance` class (see below for recommended usage patterns).
"""
def __init__(self, path: str) -> None:
DEFAULT_TIMEOUT = 10.0
def __init__(self, path: str, timeout: float = DEFAULT_TIMEOUT) -> None:
"""
`path` is the path to the log file to watch.
`timeout` is the number of seconds (float) to wait for each wait call.
Every instance of this class must call one of the `wait_for_*()`
methods exactly once or else an `Exception` is thrown.
methods at least once or else an `Exception` is thrown.
>>> with WatchLogFromStart("/dev/null") as watcher:
... print("Just print something without waiting for a log line")
Traceback (most recent call last):
...
Exception: wait_for_*() was not called
isctest.log.watchlog.WatchLogException: wait_for_*() was not called
>>> with WatchLogFromHere("/dev/null") as watcher:
... try:
... watcher.wait_for_line("foo", timeout=0)
... except TimeoutError:
... pass
... try:
... watcher.wait_for_lines({"bar": 42}, timeout=0)
... except TimeoutError:
... pass
>>> with WatchLogFromHere("/dev/null", timeout=0.0) as watcher:
... watcher.wait_for_line("foo")
Traceback (most recent call last):
...
Exception: wait_for_*() was already called
isctest.log.watchlog.WatchLogException: timeout must be greater than 0
"""
self._fd = None # type: Optional[TextIO]
self._fd: Optional[TextIO] = None
self._reader: Optional[LineReader] = None
self._path = path
self._wait_function_called = False
if timeout <= 0.0:
raise WatchLogException("timeout must be greater than 0")
self._timeout = timeout
self._deadline = 0.0
def wait_for_line(self, string: str, timeout: int = 10) -> None:
def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> List[Pattern]:
self._wait_function_called = True
self._deadline = time.monotonic() + self._timeout
return self._prepare_patterns(patterns)
def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> List[Pattern]:
"""
Block execution until a line containing the provided `string` appears
in the log file. Return `None` once the line is found or raise a
`TimeoutError` after `timeout` seconds (default: 10) if `string` does
not appear in the log file (strings and regular expressions are
supported). (Catching this exception is discouraged as it indicates
that the test code did not behave as expected.)
Convert a mix of string(s) and/or pattern(s) into a list of patterns.
Any strings are converted into regular expression patterns that match
the string verbatim.
"""
patterns = []
if not isinstance(strings, list):
strings = [strings]
for string in strings:
if isinstance(string, Pattern):
patterns.append(string)
elif isinstance(string, str):
pattern = re.compile(re.escape(string))
patterns.append(pattern)
else:
raise WatchLogException(
"only string and re.Pattern allowed for matching"
)
return patterns
def _wait_for_match(self, regexes: List[Pattern]) -> Match:
if not self._reader:
raise WatchLogException(
"use WatchLog as context manager before calling wait_for_*() functions"
)
while time.monotonic() < self._deadline:
for line in self._reader.readlines():
for regex in regexes:
match = regex.search(line)
if match:
return match
time.sleep(0.1)
raise WatchLogTimeout(
f"Timeout reached watching {self._path} for "
f"{' | '.join([regex.pattern for regex in regexes])}"
)
def wait_for_line(self, patterns: OneOrMore[FlexPattern]) -> Match:
"""
Block execution until any line of interest appears in the log file.
`patterns` accepts one value or a list of values, with each value being
either a regular expression pattern, or a string which should be
matched verbatim (without interpreting it as a regular expression).
If any of the patterns is found anywhere within a line in the log file,
return the match, allowing access to the matched line, the regex
groups, and the regex which matched. See re.Match for more.
A `WatchLogTimeout` is raised if the function fails to find any of the
`patterns` in the allotted time.
Recommended use:
@@ -110,13 +259,27 @@ class WatchLog(abc.ABC):
import isctest
def test_foo(servers):
with servers["ns1"].watch_log_from_start() as watcher:
watcher.wait_for_line("all zones loaded")
pattern = re.compile(r"next key event in ([0-9]+) seconds")
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
watcher.wait_for_line("foo bar")
match = watcher.wait_for_line(pattern)
seconds = int(match.groups(1))
strings = [
"freezing zone",
"thawing zone",
]
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
match = watcher.wait_for_line(strings)
line = match.string
```
One of `wait_for_line()` or `wait_for_lines()` must be called exactly
once for every `WatchLogFrom*` instance.
`wait_for_line()` must be called exactly once for every `WatchLog`
instance.
>>> # For `WatchLogFromStart`, `wait_for_line()` returns without
>>> # raising an exception as soon as the line being looked for appears
@@ -124,135 +287,205 @@ class WatchLog(abc.ABC):
>>> # after the `with` statement is reached.
>>> import tempfile
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo", file=file, flush=True)
... print("foo bar baz", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
... retval = watcher.wait_for_line("foo", timeout=1)
>>> print(retval)
None
... match = watcher.wait_for_line("bar")
>>> print(match.string.strip())
foo bar baz
>>> with tempfile.NamedTemporaryFile("w") as file:
... with WatchLogFromStart(file.name) as watcher:
... print("foo", file=file, flush=True)
... retval = watcher.wait_for_line("foo", timeout=1)
>>> print(retval)
None
... print("foo bar baz", file=file, flush=True)
... match = watcher.wait_for_line("bar")
>>> print(match.group(0))
bar
>>> # For `WatchLogFromHere`, `wait_for_line()` only returns without
>>> # raising an exception if the string being looked for appears in
>>> # the log file after the `with` statement is reached.
>>> import tempfile
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo", file=file, flush=True)
... with WatchLogFromHere(file.name) as watcher:
... watcher.wait_for_line("foo", timeout=1) #doctest: +ELLIPSIS
... print("foo bar baz", file=file, flush=True)
... with WatchLogFromHere(file.name, timeout=0.1) as watcher:
... watcher.wait_for_line("bar") #doctest: +ELLIPSIS
Traceback (most recent call last):
...
TimeoutError: Timeout reached watching ...
isctest.log.watchlog.WatchLogTimeout: ...
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo", file=file, flush=True)
... print("foo bar baz", file=file, flush=True)
... with WatchLogFromHere(file.name) as watcher:
... print("foo", file=file, flush=True)
... retval = watcher.wait_for_line("foo", timeout=1)
>>> print(retval)
None
"""
return self._wait_for({string: None}, timeout)
def wait_for_lines(
self, strings: Dict[Union[str, Pattern], Any], timeout: int = 10
) -> None:
"""
Block execution until a line of interest appears in the log file. This
function is a "multi-match" variant of `wait_for_line()` which is
useful when some action may cause several different (mutually
exclusive) messages to appear in the log file.
`strings` is a `dict` associating each string to look for with the
value this function should return when that string is found in the log
file (strings and regular expressions are supported). If none of the
`strings` being looked for appear in the log file after `timeout`
seconds, a `TimeoutError` is raised. (Catching this exception is
discouraged as it indicates that the test code did not behave as
expected.)
Since `strings` is a `dict` and preserves key order (in CPython 3.6 as
implementation detail, since 3.7 by language design), each line is
checked against each key in order until the first match. Values provided
in the `strings` dictionary (i.e. values which this function is expected
to return upon a successful match) can be of any type.
Recommended use:
```python
import isctest
def test_foo(servers):
triggers = {
"message A": "value returned when message A is found",
"message B": "value returned when message B is found",
}
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
retval = watcher.wait_for_lines(triggers)
```
One of `wait_for_line()` or `wait_for_lines()` must be called exactly
once for every `WatchLogFromHere` instance.
... print("bar qux", file=file, flush=True)
... match = watcher.wait_for_line("bar")
>>> print(match.string.strip())
bar qux
>>> # Different values must be returned depending on which line is
>>> # found in the log file.
>>> import tempfile
>>> triggers = {"foo": 42, "bar": 1337}
>>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo bar 3", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
... match1 = watcher.wait_for_line(patterns)
... with WatchLogFromHere(file.name) as watcher:
... print("baz qux", file=file, flush=True)
... match2 = watcher.wait_for_line(patterns)
>>> print(match1.group(1))
3
>>> print(match2.group(0))
qux
"""
regexes = self._setup_wait(patterns)
return self._wait_for_match(regexes)
def wait_for_sequence(self, patterns: List[FlexPattern]) -> List[Match]:
"""
Block execution until the specified pattern sequence is found in the
log file.
`patterns` is a list of values, with each value being either a regular
expression pattern, or a string which should be matched verbatim
(without interpreting it as a regular expression). Order of patterns is
important, as each pattern is looked for only after all the previous
patterns have matched.
All the matches are returned as a list.
A `WatchLogTimeout` is raised if the function fails to find all of the
`patterns` in the given order in the allotted time.
>>> import tempfile
>>> seq = ['a', 'b', 'c']
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("b", file=file, flush=True)
... print("a", file=file, flush=True)
... print("b", file=file, flush=True)
... print("z", file=file, flush=True)
... print("c", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
... ret = watcher.wait_for_sequence(seq)
>>> assert ret[0].group(0) == "a"
>>> assert ret[1].group(0) == "b"
>>> assert ret[2].group(0) == "c"
>>> import tempfile
>>> seq = ['a', 'b', 'c']
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("b", file=file, flush=True)
... print("a", file=file, flush=True)
... print("c", file=file, flush=True)
... with WatchLogFromStart(file.name, timeout=0.1) as watcher:
... ret = watcher.wait_for_sequence(seq) #doctest: +ELLIPSIS
Traceback (most recent call last):
...
isctest.log.watchlog.WatchLogTimeout: ...
>>> import tempfile
>>> seq = ['a', 'b', 'c']
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("b", file=file, flush=True)
... print("a", file=file, flush=True)
... print("b", file=file, flush=True)
... with WatchLogFromStart(file.name, timeout=0.1) as watcher:
... ret = watcher.wait_for_sequence(seq) #doctest: +ELLIPSIS
Traceback (most recent call last):
...
isctest.log.watchlog.WatchLogTimeout: ...
>>> import tempfile
>>> seq = ['a', 'b', 'c']
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("b", file=file, flush=True)
... print("a", file=file, flush=True)
... print("c", file=file, flush=True)
... print("b", file=file, flush=True)
... with WatchLogFromStart(file.name, timeout=0.1) as watcher:
... ret = watcher.wait_for_sequence(seq) #doctest: +ELLIPSIS
Traceback (most recent call last):
...
isctest.log.watchlog.WatchLogTimeout: ...
"""
regexes = self._setup_wait(patterns)
matches = []
for regex in regexes:
match = self._wait_for_match([regex])
matches.append(match)
return matches
def wait_for_all(self, patterns: List[FlexPattern]) -> List[Match]:
"""
Block execution until all the specified patterns are found in the
log file in any order.
`patterns` is a list of values, with each value being either a regular
expression pattern, or a string which should be matched verbatim
(without interpreting it as a regular expression). Order of patterns is
irrelevant and they may appear in any order.
All the matches are returned as a list. The matches are listed in the
order of appearance. Pattern may match more than once, and all the
matches are included. To pair matches with the patterns, re.Match.re
may be used.
A `WatchLogTimeout` is raised if the function fails to find all of the
`patterns` in the allotted time.
>>> import tempfile
>>> patterns = ['foo', 'bar']
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("bar", file=file, flush=True)
... print("foo", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
... retval1 = watcher.wait_for_lines(triggers, timeout=1)
... with WatchLogFromHere(file.name) as watcher:
... print("bar", file=file, flush=True)
... retval2 = watcher.wait_for_lines(triggers, timeout=1)
>>> print(retval1)
42
>>> print(retval2)
1337
"""
return self._wait_for(strings, timeout)
... ret = watcher.wait_for_all(patterns)
>>> assert ret[0].group(0) == "bar"
>>> assert ret[1].group(0) == "foo"
def _wait_for(self, patterns: Dict[Union[str, Pattern], Any], timeout: int) -> Any:
>>> import tempfile
>>> bar_pattern = re.compile('bar')
>>> patterns = ['foo', bar_pattern]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("bar", file=file, flush=True)
... print("baz", file=file, flush=True)
... print("bar", file=file, flush=True)
... print("foo", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
... ret = watcher.wait_for_all(patterns)
>>> assert len(ret) == 3
>>> assert ret[0].group(0) == "bar"
>>> assert ret[1].group(0) == "bar"
>>> assert ret[2].group(0) == "foo"
>>> assert ret[0].re == bar_pattern
>>> assert ret[1].re == bar_pattern
>>> assert ret[2].re.pattern == "foo"
>>> import tempfile
>>> patterns = ['foo', 'bar']
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo", file=file, flush=True)
... print("quux", file=file, flush=True)
... with WatchLogFromStart(file.name, timeout=0.1) as watcher:
... ret = watcher.wait_for_all(patterns) #doctest: +ELLIPSIS
Traceback (most recent call last):
...
isctest.log.watchlog.WatchLogTimeout: ...
"""
Block execution until one of the `strings` being looked for appears in
the log file. Raise a `TimeoutError` if none of the `strings` being
looked for are found in the log file for `timeout` seconds.
"""
if self._wait_function_called:
raise WatchLogException("wait_for_*() was already called")
self._wait_function_called = True
if not self._fd:
raise WatchLogException("No file to watch")
leftover = ""
assert timeout, "Do not use this class unless you want to WAIT for something."
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
for line in self._fd.readlines():
if line[-1] != "\n":
# Line is not completely written yet, buffer and keep on waiting
leftover += line
else:
line = leftover + line
leftover = ""
for string, retval in patterns.items():
if isinstance(string, Pattern) and string.search(line):
return retval
if isinstance(string, str) and string in line:
return retval
time.sleep(0.1)
raise TimeoutError(
"Timeout reached watching {} for {}".format(
self._path, list(patterns.keys())
)
)
regexes = self._setup_wait(patterns)
unmatched_regexes = set(regexes)
matches = []
while unmatched_regexes:
match = self._wait_for_match(regexes)
matches.append(match)
unmatched_regexes.discard(match.re)
return matches
def __enter__(self) -> Any:
self._fd = open(self._path, encoding="utf-8")
self._seek_on_enter()
self._reader = LineReader(self._fd)
return self
@abc.abstractmethod
@@ -269,8 +502,9 @@ class WatchLog(abc.ABC):
def __exit__(self, *_: Any) -> None:
if not self._wait_function_called:
raise WatchLogException("wait_for_*() was not called")
if self._fd:
self._fd.close()
self._reader = None
assert self._fd
self._fd.close()
class WatchLogFromStart(WatchLog):
@@ -291,5 +525,5 @@ class WatchLogFromHere(WatchLog):
"""
def _seek_on_enter(self) -> None:
if self._fd:
self._fd.seek(0, os.SEEK_END)
assert self._fd
self._fd.seek(0, os.SEEK_END)

View File

@@ -30,7 +30,9 @@ live_internet_test = pytest.mark.skipif(
def feature_test(feature):
feature_test_bin = os.environ["FEATURETEST"]
feature_test_bin = os.environ.get("FEATURETEST")
if not feature_test_bin: # this can be the case when running doctest
return False
try:
subprocess.run([feature_test_bin, feature], check=True)
except subprocess.CalledProcessError as exc:

View File

@@ -75,9 +75,6 @@ def test_xferquota(named_port, servers):
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
)
with servers["ns2"].watch_log_from_start() as watcher:
watcher.wait_for_line(
pattern,
timeout=30,
)
with servers["ns2"].watch_log_from_start(timeout=30) as watcher:
watcher.wait_for_line(pattern)
query_and_compare(a_msg)