mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-28 13:08:06 +00:00
Use context managers as suggested by PyLint 2.8.2
PyLint 2.8.2 reports the following suggestions for two Python scripts used in the system test suite: ************* Module tests_rndc_deadlock bin/tests/system/addzone/tests_rndc_deadlock.py:71:4: R1732: Consider using 'with' for resource-allocating operations (consider-using-with) ************* Module tests-shutdown bin/tests/system/shutdown/tests-shutdown.py:68:4: R1732: Consider using 'with' for resource-allocating operations (consider-using-with) bin/tests/system/shutdown/tests-shutdown.py:154:8: R1732: Consider using 'with' for resource-allocating operations (consider-using-with) Implement the above suggestions by using concurrent.futures.ThreadPoolExecutor() and subprocess.Popen() as context managers.
This commit is contained in:
parent
71284cb496
commit
a8163551ed
@ -68,13 +68,13 @@ def test_rndc_deadlock():
|
||||
test_state = {'finished': False}
|
||||
|
||||
# Create 4 worker threads running "rndc" commands in a loop.
|
||||
executor = concurrent.futures.ThreadPoolExecutor()
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
for i in range(1, 5):
|
||||
domain = 'example%d' % i
|
||||
executor.submit(rndc_loop, test_state, domain)
|
||||
|
||||
# Run "rndc status" in 1-second intervals for a maximum of 10 seconds. If
|
||||
# any "rndc status" command fails, the loop will be interrupted.
|
||||
# Run "rndc status" in 1-second intervals for a maximum of 10 seconds.
|
||||
# If any "rndc status" command fails, the loop will be interrupted.
|
||||
server_is_responsive = True
|
||||
attempts = 10
|
||||
while server_is_responsive and attempts > 0:
|
||||
@ -84,7 +84,6 @@ def test_rndc_deadlock():
|
||||
|
||||
# Signal worker threads that the test is finished.
|
||||
test_state['finished'] = True
|
||||
executor.shutdown()
|
||||
|
||||
# Check whether all "rndc status" commands succeeded.
|
||||
assert server_is_responsive
|
||||
|
@ -65,7 +65,7 @@ def do_work(named_proc, resolver, rndc_cmd, kill_method, n_workers, n_queries):
|
||||
|
||||
# We're going to execute queries in parallel by means of a thread pool.
|
||||
# dnspython functions block, so we need to circunvent that.
|
||||
executor = ThreadPoolExecutor(n_workers + 1)
|
||||
with ThreadPoolExecutor(n_workers + 1) as executor:
|
||||
|
||||
# Helper dict, where keys=Future objects and values are tags used
|
||||
# to process results later.
|
||||
@ -123,8 +123,6 @@ def do_work(named_proc, resolver, rndc_cmd, kill_method, n_workers, n_queries):
|
||||
if kill_method == "rndc":
|
||||
assert ret_code == 0
|
||||
|
||||
executor.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.dnspython
|
||||
def test_named_shutdown(named_port, control_port):
|
||||
@ -149,14 +147,6 @@ def test_named_shutdown(named_port, control_port):
|
||||
rndc_cmd = [rndc, "-c", rndc_cfg, "-p", str(control_port),
|
||||
"-s", "10.53.0.3"]
|
||||
|
||||
# Helper function, launch named without blocking.
|
||||
def launch_named():
|
||||
proc = subprocess.Popen([named, "-c", cfg_file, "-f"], cwd=cfg_dir)
|
||||
# Ensure named is running
|
||||
assert proc.poll() is None
|
||||
|
||||
return proc
|
||||
|
||||
# We create a resolver instance that will be used to send queries.
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = ['10.53.0.3']
|
||||
@ -167,7 +157,10 @@ def test_named_shutdown(named_port, control_port):
|
||||
# Method 2: killing with SIGTERM
|
||||
# In both methods named should exit gracefully.
|
||||
for kill_method in ("rndc", "sigterm"):
|
||||
named_proc = launch_named()
|
||||
named_cmdline = [named, "-c", cfg_file, "-f"]
|
||||
with subprocess.Popen(named_cmdline, cwd=cfg_dir) as named_proc:
|
||||
# Ensure named is running
|
||||
assert named_proc.poll() is None
|
||||
# wait for named to finish loading
|
||||
for _ in range(10):
|
||||
try:
|
||||
|
Loading…
x
Reference in New Issue
Block a user