2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-22 01:49:48 +00:00

[#3287] fix pylint warnings

- C0115: Missing class docstring (missing-class-docstring)
- C0123: Use isinstance() rather than type() for a typecheck. (unidiomatic-typecheck)
- C0201: Consider iterating the dictionary directly instead of calling .keys() (consider-iterating-dictionary)
- C0206: Consider iterating with .items() (consider-using-dict-items)
- C0411: standard import "..." should be placed before "..." (wrong-import-order)
- C0415: Import outside toplevel (...) (import-outside-toplevel)
- C1802: Do not use `len(SEQUENCE)` without comparison to determine if a sequence is empty (use-implicit-booleaness-not-len)
- E0001: Parsing failed: 'invalid syntax (<unknown>, line 2313)' (syntax-error)
- E0401: Unable to import '...' (import-error)
- E0602: Undefined variable 'l' (undefined-variable)
- R0205: Class 'VagrantEnv' inherits from object, can be safely removed from bases in python3 (useless-object-inheritance)
- E1101: Instance of 'NSECBASE' has no 'dump_fixedpart' member (no-member)
- E1123: Unexpected keyword argument 'capture' in method call (unexpected-keyword-arg)
- R0902: Too many instance attributes (too-many-instance-attributes)
- R0913: Too many arguments (too-many-arguments)
- R0916: Too many boolean expressions in if statement (6/5) (too-many-boolean-expressions)
- R1717: Consider using a dictionary comprehension (consider-using-dict-comprehension)
- R1722: Consider using 'sys.exit' instead (consider-using-sys-exit)
- R1732: Consider using 'with' for resource-allocating operations (consider-using-with)
- R1735: Consider using '{}' instead of a call to 'dict'. (use-dict-literal)
- W0102: Dangerous default value sys.argv[1:] (builtins.list) as argument (dangerous-default-value)
- W0102: Dangerous default value {} as argument (dangerous-default-value)
- W0106: Expression "[f.write('%02x' % x) for x in bin_address]" is assigned to nothing (expression-not-assigned)
- W0107: Unnecessary pass statement (unnecessary-pass)
- W0201: Attribute 'config' defined outside __init__ (attribute-defined-outside-init)
- W0404: Reimport '...' (imported line ...) (reimported)
- W0611: Unused import ... (unused-import)
- W0612: Unused variable '...' (unused-variable)
- W0613: Unused argument '...' (unused-argument)
- W0621: Redefining name '...' from outer scope (line 1471) (redefined-outer-name)
- W0622: Redefining built-in '...' (redefined-builtin)
- W0707: Consider explicitly re-raising using 'raise ... from ...' (raise-missing-from)
- W0718: Catching too general exception Exception (broad-exception-caught)
- W1202: Use lazy % formatting in logging functions (logging-format-interpolation)
- W1203: Use lazy % formatting in logging functions (logging-fstring-interpolation)
- W1308: Duplicate string formatting argument 'connection_type', consider passing as named argument (duplicate-string-formatting-argument)
- W1401: Anomalous backslash in string: '\/'. String constant might be missing an r prefix. (anomalous-backslash-in-string)
- W1406: The u prefix for strings is no longer necessary in Python >=3.0 (redundant-u-string-prefix)
- W1514: Using open without explicitly specifying an encoding (unspecified-encoding)
- W4901: Deprecated module 'optparse' (deprecated-module)
- W4904: Using deprecated class SafeConfigParser of module configparser (deprecated-class)
This commit is contained in:
Andrei Pavel 2024-06-07 11:43:43 +03:00
parent 8e37580e59
commit 9c35a4db68
No known key found for this signature in database
GPG Key ID: D4E804481939CB21
12 changed files with 290 additions and 303 deletions

View File

@ -33,7 +33,7 @@ def read_input_files(files):
if name.startswith('_'):
print("Skipping %s (starts with underscore)" % f)
continue
with open(f) as fp:
with open(f, encoding='utf-8') as fp:
print("Processing %s" % f)
# use OrderedDict to preserve order of fields in cmd-syntax
try:
@ -42,7 +42,7 @@ def read_input_files(files):
print(f'\nError while processing {f}: {e}\n\n')
raise
if name != descr['name']:
exit("Expected name == descr['name'], but name is {name} and descr['name'] is {descr['name']}")
raise ValueError("Expected name == descr['name']. Name is {name} and descr['name'] is {descr['name']}.")
apis[name] = descr
@ -196,7 +196,7 @@ def generate(in_files, out_file):
rst = generate_rst(apis)
if out_file:
with open(out_file, 'w') as f:
with open(out_file, 'w', encoding='utf-8') as f:
f.write(rst)
print('Wrote generated RST content to: %s' % out_file)
else:

View File

@ -6,31 +6,35 @@
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
import os
import sys
from shutil import copyfile
# -- Path setup --------------------------------------------------------------
# to avoid sphinx.errors.SphinxParallelError: RecursionError: maximum recursion depth exceeded while pickling an object
sys.setrecursionlimit(5000)
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
SRC_DIR = os.path.abspath(os.path.dirname(__file__))
sys.path.append(SRC_DIR)
# to avoid sphinx.errors.SphinxParallelError: RecursionError: maximum recursion depth exceeded while pickling an object
import sys
sys.setrecursionlimit(5000)
import api2doc # noqa # pylint: disable=wrong-import-position
import mes2doc # noqa # pylint: disable=wrong-import-position
# -- Project information -----------------------------------------------------
project = 'Kea'
copyright = '2019-2024, Internet Systems Consortium'
copyright = '2019-2024, Internet Systems Consortium' # pylint: disable=redefined-builtin
author = 'Internet Systems Consortium'
# get current kea version
config_ac_path = '../../configure.ac'
changelog_path = '../../ChangeLog'
release = 'UNRELEASED'
with open(config_ac_path) as f:
with open(config_ac_path, encoding='utf-8') as f:
for line in f.readlines():
if line.startswith('AC_INIT(kea'):
parts = line.split(',')
@ -39,7 +43,7 @@ with open(config_ac_path) as f:
# that this is the final release.
dash_parts = release.split('-')
candidate_release = dash_parts[0]
with open(changelog_path) as changelog_file:
with open(changelog_path, encoding='utf-8') as changelog_file:
first_line = changelog_file.readline()
if candidate_release in first_line and "released" in first_line:
release = candidate_release
@ -251,23 +255,15 @@ rst_prolog = """
# Do generation of api.rst and kea-messages.rst here in conf.py instead of Makefile.am
# so they are available on ReadTheDocs as there makefiles are not used for building docs.
def run_generate_docs(_):
import os
import sys
src_dir = os.path.abspath(os.path.dirname(__file__))
print(src_dir)
sys.path.append(src_dir)
import api2doc
with open(os.path.join(src_dir, 'api-files.txt')) as af:
with open(os.path.join(SRC_DIR, 'api-files.txt'), encoding='utf-8') as af:
api_files = af.read().split()
api_files = [os.path.abspath(os.path.join(src_dir, '../..', af)) for af in api_files]
api2doc.generate(api_files, os.path.join(src_dir, 'api.rst'))
api_files = [os.path.abspath(os.path.join(SRC_DIR, '../..', af)) for af in api_files]
api2doc.generate(api_files, os.path.join(SRC_DIR, 'api.rst'))
import mes2doc
with open(os.path.join(src_dir, 'mes-files.txt')) as mf:
with open(os.path.join(SRC_DIR, 'mes-files.txt'), encoding='utf-8') as mf:
mes_files = mf.read().split()
mes_files = [os.path.abspath(os.path.join(src_dir, '../..', mf)) for mf in mes_files]
mes2doc.generate(mes_files, os.path.join(src_dir, 'kea-messages.rst'))
mes_files = [os.path.abspath(os.path.join(SRC_DIR, '../..', mf)) for mf in mes_files]
mes2doc.generate(mes_files, os.path.join(SRC_DIR, 'kea-messages.rst'))
# Sphinx has some limitations. It can't import files from outside its directory, which
# in our case is src/sphinx. On the other hand, we need to have platforms.rst file
@ -292,10 +288,9 @@ def run_generate_docs(_):
['../examples/template-ha-mt-tls/kea-dhcp4-2.conf', 'template-ha-mt-tls-dhcp4-2.conf']
]
from shutil import copyfile
for [a, b] in FILES_TO_COPY:
src = os.path.join(src_dir, a)
dst = os.path.join(src_dir, 'arm', b)
src = os.path.join(SRC_DIR, a)
dst = os.path.join(SRC_DIR, 'arm', b)
print("Copying %s to %s" % (src, dst))
copyfile(src, dst)

View File

@ -36,9 +36,8 @@ def parse_args():
def read_input_files(files):
messages = {}
for f in files:
with open(f) as fp:
with open(f, encoding='utf-8') as fp:
print("Processing %s" % f)
namespace = None
msg_descr = None
msg_id = None
msg_text = None
@ -119,7 +118,7 @@ def generate(in_files, out_file):
rst = generate_rst(messages)
if out_file:
with open(out_file, 'w') as f:
with open(out_file, 'w', encoding='utf-8') as f:
f.write(rst)
print('Wrote generated RST content to: %s' % out_file)
else:

315
hammer.py
View File

@ -6,6 +6,8 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# pylint: disable=broad-exception-caught
"""Hammer - Kea development environment management tool."""
from __future__ import print_function
@ -117,10 +119,9 @@ SYSTEMS = {
'3.19': True,
'3.20': True,
},
'arch': {}
'arch': {},
}
# pylint: disable=C0326
IMAGE_TEMPLATES = {
# fedora
'fedora-27-lxc': {'bare': 'lxc-fedora-27', 'kea': 'godfryd/kea-fedora-27'},
@ -276,52 +277,37 @@ def get_system_revision():
system = platform.system()
if system == 'Linux':
system, revision = None, None
try:
system, revision, _ = platform.dist() # pylint: disable=deprecated-method
if system == 'debian':
revision = revision.split('.')[0]
elif system == 'redhat':
system = 'rhel'
revision = revision[0]
elif system == 'rocky':
revision = revision[0]
elif system == 'centos':
revision = revision[0]
if not system or not revision:
raise Exception('fallback to /etc/os-release')
except Exception:
if os.path.exists('/etc/os-release'):
vals = {}
with open('/etc/os-release') as f:
for line in f.readlines():
if '=' in line:
key, val = line.split('=', 1)
vals[key.strip()] = val.strip().replace('"', '')
if not os.path.exists('/etc/os-release'):
raise UnexpectedError('/etc/os-release does not exist. Cannot determine system or its revision.')
vals = {}
with open('/etc/os-release', encoding='utf-8') as f:
for line in f.readlines():
if '=' in line:
key, val = line.split('=', 1)
vals[key.strip()] = val.strip().replace('"', '')
for i in ['ID', 'ID_LIKE']:
if i in vals:
system_candidates = vals[i].strip('"').split()
for system_candidate in system_candidates:
if system_candidate in SYSTEMS:
system = system_candidate
break
else:
continue
for i in ['ID', 'ID_LIKE']:
if i in vals:
system_candidates = vals[i].strip('"').split()
for system_candidate in system_candidates:
if system_candidate in SYSTEMS:
system = system_candidate
break
if system is None:
raise Exception('cannot determine system')
else:
continue
break
if system is None:
raise UnexpectedError('cannot determine system')
for i in ['VERSION_ID', 'BUILD_ID']:
if i in vals:
revision = vals[i]
break
if revision is None:
raise Exception('cannot determine revision')
for i in ['VERSION_ID', 'BUILD_ID']:
if i in vals:
revision = vals[i]
break
if revision is None:
raise UnexpectedError('cannot determine revision')
if system in ['alpine', 'rhel', 'rocky']:
revision = revision.rsplit('.', 1)[0]
else:
raise Exception('cannot determine system or its revision')
if system in ['alpine', 'rhel', 'rocky']:
revision = revision.rsplit('.', 1)[0]
elif system == 'FreeBSD':
system = system.lower()
revision = platform.release()
@ -336,7 +322,10 @@ def get_system_revision():
class ExecutionError(Exception):
"""Exception thrown when execution encountered an error."""
pass
class UnexpectedError(Exception):
"""Exception thrown when an unexpected error occurred that hammer does not know how to recover from."""
def execute(cmd, timeout=60, cwd=None, env=None, raise_error=True, dry_run=False, log_file_path=None,
@ -379,57 +368,43 @@ def execute(cmd, timeout=60, cwd=None, env=None, raise_error=True, dry_run=False
cmd = cmd.replace('sudo', 'sudo -E')
if log_file_path:
log_file = open(log_file_path, "wb")
with open(log_file_path, "wb", encoding='utf-8') as file:
log_file = file.read()
for attempt in range(attempts):
if interactive:
# Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified,
# security issue.
p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True) # nosec B602
exitcode = p.wait()
with subprocess.Popen(cmd, cwd=cwd, env=env, shell=True) as pipe: # nosec: B602
pipe.communicate()
exitcode = pipe.returncode
else:
# Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified,
# security issue.
p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True, # nosec B602
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if capture:
output = ''
t0 = time.time()
# Repeat until the process has stopped and there are no more lines
# to read, or until the timeout is up.
while True:
line = p.stdout.readline()
if line:
line_decoded = line.decode(encoding='ascii', errors='ignore').rstrip() + '\r'
if not quiet:
log.info(line_decoded)
if capture:
output += line_decoded
if log_file_path:
log_file.write(line)
t1 = time.time()
if p.poll() is not None and not line or (timeout is not None and timeout < t1 - t0):
break
# If no exitcode yet, ie. process is still running then it means that timeout occurred.
# In such case terminate the process and raise an exception.
if p.poll() is None:
# kill using sudo to be able to kill other sudo commands
execute('sudo kill -s TERM %s' % p.pid)
time.sleep(5)
# if still running, kill harder
if p.poll() is None:
execute('sudo kill -s KILL %s' % p.pid)
msg = "Execution timeout, %d > %d seconds elapsed (start: %d, stop %d), cmd: '%s'"
msg = msg % (t1 - t0, timeout, t0, t1, cmd)
raise ExecutionError(msg)
exitcode = p.returncode
with subprocess.Popen(cmd, cwd=cwd, env=env, shell=True, # nosec: B602
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as pipe:
if timeout is not None:
pipe.wait(timeout)
try:
stdout, _ = pipe.communicate()
except subprocess.TimeoutExpired as e:
pipe.kill()
stdout2, _ = pipe.communicate()
stdout += stdout2
raise ExecutionError(f'Execution timeout: {e}, cmd: {cmd}') from e
exitcode = pipe.returncode
if capture:
output = stdout.decode('utf-8')
if not quiet:
print(stdout.decode('utf-8'))
if log_file_path is not None:
log_file.write(stdout)
if exitcode == 0:
break
elif attempt < attempts - 1:
if attempt < attempts - 1:
txt = 'command failed, retry, attempt %d/%d' % (attempt, attempts)
if log_file_path:
txt_to_file = '\n\n[HAMMER] %s\n\n\n' % txt
@ -468,7 +443,12 @@ def _prepare_installed_packages_cache_for_debs():
continue
status, name, version, arch, descr = m.groups()
name = name.split(':')[0]
pkg_cache[name] = dict(status=status, version=version, arch=arch, descr=descr)
pkg_cache[name] = {
'status': status,
'version': version,
'arch': arch,
'descr': descr,
}
return pkg_cache
@ -480,7 +460,7 @@ def _prepare_installed_packages_cache_for_rpms():
for line in out.splitlines():
name = line.strip()
pkg_cache[name] = dict(status='ii')
pkg_cache[name] = {'status': 'ii'}
return pkg_cache
@ -492,7 +472,7 @@ def _prepare_installed_packages_cache_for_alpine():
for line in out.splitlines():
name = line.strip()
pkg_cache[name] = dict(status='ii')
pkg_cache[name] = {'status': 'ii'}
return pkg_cache
@ -575,7 +555,7 @@ def get_image_template(key, variant):
return IMAGE_TEMPLATES[key][variant]
def _get_full_repo_url(repository_url, system, revision, pkg_version):
def _get_full_repo_url(repository_url, system, revision):
if not repository_url:
return None
repo_name = 'kea-%s-%s' % (system, revision)
@ -584,7 +564,7 @@ def _get_full_repo_url(repository_url, system, revision, pkg_version):
return repo_url
class VagrantEnv(object):
class VagrantEnv():
"""Helper class that makes interacting with Vagrant easier.
It creates Vagrantfile according to specified system. It exposes basic Vagrant functions
@ -619,7 +599,7 @@ class VagrantEnv(object):
self.python = None
self.key = key = "%s-%s-%s" % (system, revision, provider)
self.image_tpl = image_tpl = get_image_template(key, image_template_variant)
self.image_tpl = get_image_template(key, image_template_variant)
self.repo_dir = os.getcwd()
sys_dir = "%s-%s" % (system, revision)
@ -670,7 +650,7 @@ class VagrantEnv(object):
box_version=box_version,
hostname=hostname)
with open(vagrantfile_path, "w") as f:
with open(vagrantfile_path, "w", encoding='utf-8') as f:
f.write(vagrantfile)
log.info('Prepared vagrant system %s in %s', self.name, self.vagrant_dir)
@ -706,7 +686,7 @@ class VagrantEnv(object):
with urllib.request.urlopen(url) as response: # nosec B310
data = response.read()
except Exception as e:
log.exception(f'ignored exception: {e}')
log.exception('ignored exception: %s', e)
return {}
data = json.loads(data)
return data
@ -715,7 +695,7 @@ class VagrantEnv(object):
meta_file = os.path.join(self.vagrant_dir, '.vagrant/machines/default', self.provider, 'box_meta')
if not os.path.exists(meta_file):
return {}
with open(meta_file) as f:
with open(meta_file, encoding='utf-8') as f:
data = f.read()
data = json.loads(data)
return data
@ -751,7 +731,7 @@ class VagrantEnv(object):
_, out = execute("vagrant status", cwd=self.vagrant_dir, timeout=15, capture=True, quiet=True)
m = re.search(r'default\s+(.+)\(', out)
if not m:
raise Exception('cannot get status in:\n%s' % out)
raise UnexpectedError('cannot get status in:\n%s' % out)
return m.group(1).strip()
def bring_up_latest_box(self):
@ -813,7 +793,7 @@ class VagrantEnv(object):
# correct files ownership
execute('sudo chown `id -un`:`id -gn` *', cwd=lxc_box_dir)
# and other metadata
with open(os.path.join(lxc_box_dir, 'metadata.json'), 'w') as f:
with open(os.path.join(lxc_box_dir, 'metadata.json'), 'w', encoding='utf-8') as f:
now = datetime.datetime.now()
f.write('{\n')
f.write(' "provider": "lxc",\n')
@ -906,7 +886,7 @@ class VagrantEnv(object):
execute('scp -F %s -r default:~/kea-pkg/* .' % ssh_cfg_path, cwd=pkgs_dir)
if upload:
repo_url = _get_full_repo_url(repository_url, self.system, self.revision, pkg_version)
repo_url = _get_full_repo_url(repository_url, self.system, self.revision)
if repo_url is None:
raise ValueError('repo_url is None')
upload_cmd = 'curl -v --netrc -f'
@ -957,7 +937,7 @@ class VagrantEnv(object):
execute(cmd, cwd=self.vagrant_dir)
results_file = os.path.join(self.vagrant_dir, 'unit-test-results.json')
if os.path.exists(results_file):
with open(results_file) as f:
with open(results_file, encoding='utf-8') as f:
txt = f.read()
results = json.loads(txt)
total = results['grand_total']
@ -966,7 +946,7 @@ class VagrantEnv(object):
cmd = 'scp -F %s -r default:/home/vagrant/aggregated_tests.xml .' % ssh_cfg_path
execute(cmd, cwd=self.vagrant_dir)
except Exception as e:
log.exception(f'ignored issue with parsing unit test results: {e}')
log.exception('ignored issue with parsing unit test results: %s', e)
return total, passed
@ -987,7 +967,7 @@ class VagrantEnv(object):
execute('vagrant ssh-config > %s' % ssh_cfg_path, cwd=self.vagrant_dir)
return ssh_cfg_path
def execute(self, cmd, timeout=None, raise_error=True, log_file_path=None, quiet=False, env=None,
def execute(self, cmd, timeout=None, raise_error=True, log_file_path=None, quiet=False, env=None, capture=False,
attempts=1, sleep_time_after_attempt=None):
"""Execute provided command inside Vagrant system."""
if not env:
@ -996,7 +976,7 @@ class VagrantEnv(object):
return execute('vagrant ssh -c "%s"' % cmd, env=env, cwd=self.vagrant_dir, timeout=timeout,
raise_error=raise_error, dry_run=self.dry_run, log_file_path=log_file_path,
quiet=quiet, check_times=self.check_times,
quiet=quiet, check_times=self.check_times, capture=capture,
attempts=attempts, sleep_time_after_attempt=sleep_time_after_attempt)
def prepare_system(self):
@ -1038,7 +1018,7 @@ class VagrantEnv(object):
exitcode = self.execute(cmd, raise_error=False)
if exitcode != 0:
env = os.environ.copy()
with open(os.path.expanduser('~/rhel-creds.txt')) as f:
with open(os.path.expanduser('~/rhel-creds.txt'), encoding='utf-8') as f:
env['RHEL_USER'] = f.readline().strip()
env['RHEL_PASSWD'] = f.readline().strip()
self.execute('sudo subscription-manager register --user $RHEL_USER --password "$RHEL_PASSWD"', env=env)
@ -1114,7 +1094,7 @@ def _install_gtest_sources():
gtest_version = '1.14.0'
gtest_path = f'/usr/src/googletest-release-{gtest_version}/googletest'
if os.path.exists(gtest_path):
log.info(f'gtest is already installed in {gtest_path}.')
log.info('gtest is already installed in %s.', gtest_path)
return
execute('mkdir -p ~/.hammer-tmp')
@ -1134,7 +1114,7 @@ def _install_libyang_from_sources(ignore_errors=False):
libyang_header = f'{prefix}/include/libyang/version.h'
if (any(os.path.exists(i) for i in libyang_so_candidates) and os.path.exists(libyang_header) and
execute(f"grep -F '#define LY_VERSION_MAJOR 2' '{libyang_header}'", raise_error=False) == 0):
log.info(f'libyang is already installed at {libyang_header}.')
log.info('libyang is already installed at %s.', libyang_header)
return
version = 'v2.1.4'
@ -1149,7 +1129,7 @@ def _install_libyang_from_sources(ignore_errors=False):
cwd='~/.hammer-tmp/libyang/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/libyang/build')
execute('sudo make install', cwd='~/.hammer-tmp/libyang/build')
system, revision = get_system_revision()
system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
@ -1167,7 +1147,7 @@ def _install_sysrepo_from_sources(ignore_errors=False):
sysrepo_header = f'{prefix}/include/sysrepo/version.h'
if (any(os.path.exists(i) for i in sysrepo_so_candidates) and os.path.exists(sysrepo_header) and
execute(f"grep -F '#define SR_VERSION_MAJOR 7' '{sysrepo_header}'", raise_error=False) == 0):
log.info(f'sysrepo is already installed at {sysrepo_header}.')
log.info('sysrepo is already installed at %s.', sysrepo_header)
return
version = 'v2.2.12'
@ -1185,7 +1165,7 @@ def _install_sysrepo_from_sources(ignore_errors=False):
execute('cmake -DBUILD_TESTING=OFF -DREPO_PATH=/etc/sysrepo ..', cwd='~/.hammer-tmp/sysrepo/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/sysrepo/build')
execute('sudo make install', cwd='~/.hammer-tmp/sysrepo/build')
system, revision = get_system_revision()
system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
@ -1203,7 +1183,7 @@ def _install_libyang_cpp_from_sources(ignore_errors=False):
libyang_cpp_pc = f'{prefix_lib}/pkgconfig/libyang-cpp.pc'
if (os.path.exists(libyang_cpp_so) and os.path.exists(libyang_cpp_pc) and
execute(f"grep -F 'Version: 1.1.0' '{libyang_cpp_pc}'", raise_error=False) == 0):
log.info(f'libyang-cpp is already installed at {libyang_cpp_so}.')
log.info('libyang-cpp is already installed at %s.', libyang_cpp_so)
return
version = 'ae7d649ea75da081725c119dd553b2ef3121a6f8'
@ -1214,16 +1194,22 @@ def _install_libyang_cpp_from_sources(ignore_errors=False):
execute('git clone https://github.com/CESNET/libyang-cpp.git ~/.hammer-tmp/libyang-cpp')
execute(f'git checkout {version}', cwd='~/.hammer-tmp/libyang-cpp')
# New cpp compiler is more picky about missing headers. (ex. Fedora 40)
return_code = execute('sudo grep "#include <algorithm>" ~/.hammer-tmp/libyang-cpp/src/Context.cpp',
raise_error=False)
if return_code == 1:
execute(r'sed -i "/#include <libyang\/libyang.h>/a #include <algorithm>" '
'~/.hammer-tmp/libyang-cpp/src/Context.cpp')
execute("""git apply <<EOF
diff --git a/src/Context.cpp b/src/Context.cpp
index b2fe887..add11cc 100644
--- a/src/Context.cpp
+++ b/src/Context.cpp
@@ -13,2 +13,3 @@
#include <libyang/libyang.h>
+#include <algorithm>
#include <span>
EOF
""", cwd='~/.hammer-tmp/libyang-cpp')
execute('mkdir ~/.hammer-tmp/libyang-cpp/build')
execute('cmake -DBUILD_TESTING=OFF .. ', cwd='~/.hammer-tmp/libyang-cpp/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/libyang-cpp/build')
execute('sudo make install', cwd='~/.hammer-tmp/libyang-cpp/build')
system, revision = get_system_revision()
system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
@ -1241,7 +1227,7 @@ def _install_sysrepo_cpp_from_sources(ignore_errors=False):
sysrepo_cpp_pc = f'{prefix_lib}/pkgconfig/sysrepo-cpp.pc'
if (os.path.exists(sysrepo_cpp_so) and os.path.exists(sysrepo_cpp_pc) and
execute(f"grep -F 'Version: 1.1.0' '{sysrepo_cpp_pc}'", raise_error=False) == 0):
log.info(f'sysrepo-cpp is already installed at {sysrepo_cpp_so}.')
log.info('sysrepo-cpp is already installed at %s.', sysrepo_cpp_so)
return
version = '02634174ffc60568301c3d9b9b7cf710cff6a586'
@ -1255,7 +1241,7 @@ def _install_sysrepo_cpp_from_sources(ignore_errors=False):
execute('cmake -DBUILD_TESTING=OFF .. ', cwd='~/.hammer-tmp/sysrepo-cpp/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/sysrepo-cpp/build')
execute('sudo make install', cwd='~/.hammer-tmp/sysrepo-cpp/build')
system, revision = get_system_revision()
system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
@ -1332,7 +1318,7 @@ def _configure_mysql(system, revision, features):
exit_code = execute('openssl rsa -in src/lib/asiolink/testutils/ca/kea-server.key '
'-out src/lib/asiolink/testutils/ca/kea-server.key', raise_error=False)
if exit_code != 0:
log.warning(f'openssl command failed with exit code {exit_code}, but continuing...')
log.warning('openssl command failed with exit code %d, but continuing...', exit_code)
for file in [
'./src/lib/asiolink/testutils/ca/kea-ca.crt',
'./src/lib/asiolink/testutils/ca/kea-client.crt',
@ -1436,7 +1422,7 @@ ssl_key = {cert_dir}/kea-client.key
execute(cmd)
if system == 'debian' and revision == '9':
log.info('FIX FOR ISSUE kea#389: {} {}'.format(system, revision))
log.info('FIX FOR ISSUE kea#389: %s %s', system, revision)
cmd = "sh -c \"cat <<EOF | sudo mysql -u root\n"
cmd += "use keatest;\n"
cmd += "set global innodb_large_prefix=on;\n"
@ -1447,7 +1433,7 @@ ssl_key = {cert_dir}/kea-client.key
execute(cmd)
def _enable_postgresql(system, revision):
def _enable_postgresql(system):
if system == 'alpine':
execute('sudo rc-update add postgresql')
elif system == 'freebsd':
@ -1463,7 +1449,7 @@ def _enable_postgresql(system, revision):
execute('sudo systemctl enable postgresql.service')
def _restart_postgresql(system, revision):
def _restart_postgresql(system):
if system == 'freebsd':
# redirecting output from start script to /dev/null otherwise the postgresql rc.d script will hang
# calling restart instead of start allow hammer.py to pass even if postgresql is already installed
@ -1481,12 +1467,12 @@ def _restart_postgresql(system, revision):
exit_code = execute('sudo systemctl restart postgresql.service', raise_error=False)
if exit_code != 0:
log.error('Command "sudo systemctl restart postgresql.service" failed. Here is the journal:')
_, output = execute('sudo journalctl -xu postgresql.service', raise_error=False)
execute('sudo journalctl -xu postgresql.service', raise_error=False)
log.error('And here are the logs:')
_, output = execute("sudo -u postgres psql -A -t -c 'SELECT pg_current_logfile()'",
capture=True, quiet=True)
logfile = os.path.basename(output.strip())
_, output = execute(fr'sudo find /var -type f -name "{logfile}" -exec cat {{}} \;', raise_error=False)
execute(fr'sudo find /var -type f -name "{logfile}" -exec cat {{}} \;', raise_error=False)
sys.exit(exit_code)
@ -1495,11 +1481,12 @@ def _restart_postgresql(system, revision):
# and user both set to 'all'. This is to not affect authentication of
# `postgres` user which should have a separate entry.
def _change_postgresql_auth_method(connection_type, auth_method, hba_file):
execute(r"sudo sed -i.bak 's/^{}\(.*\)all\(.*\)all\(.*\) [a-z0-9]*$/{}\1all\2all\3 {}/g' '{}'".format(
connection_type, connection_type, auth_method, hba_file), cwd='/tmp')
execute(fr"sudo sed -i.bak 's/^{connection_type}\(.*\)all\(.*\)all\(.*\) [a-z0-9]*$"
fr"/{connection_type}\1all\2all\3 {auth_method}/g' '{hba_file}'",
cwd='/tmp')
def _configure_pgsql(system, revision, features):
def _configure_pgsql(system, features):
""" Configure PostgreSQL DB """
if system == 'freebsd':
@ -1539,8 +1526,8 @@ def _configure_pgsql(system, revision, features):
# the initial start of the postgresql will create the 'postmaster.opts' file
execute(f'sudo test ! -f {var_db_postgres_data}/postmaster.opts && sudo service postgresql onestart || true')
_enable_postgresql(system, revision)
_restart_postgresql(system, revision)
_enable_postgresql(system)
_restart_postgresql(system)
# Change auth-method to 'md5' on all connections.
cmd = "sudo -u postgres psql -t -c 'SHOW hba_file' | xargs"
@ -1560,7 +1547,7 @@ def _configure_pgsql(system, revision, features):
{}
' '{}'""".format(auth_header, postgres_auth_line, hba_file))
_restart_postgresql(system, revision)
_restart_postgresql(system)
cmd = """sh -c \"cat <<EOF | sudo -u postgres psql postgres
DROP DATABASE IF EXISTS keatest;
@ -1614,7 +1601,7 @@ def _get_package_version(package: str):
Returns the version available in the package manager's repository for the requested package.
:param package: the name of the package whose version is retrieved
"""
system, revision = get_system_revision()
system, _ = get_system_revision()
if system == 'alpine':
cmd = "apk search --exact {0} | sed 's/{0}-//g'"
elif system in ['debian', 'ubuntu']:
@ -1644,13 +1631,13 @@ def require_minimum_package_version(package: str, minimum: str):
if version < minimum:
message = f"ERROR: {package} has version {version}, but must be >= {minimum}"
log.error(message)
raise Exception(message)
raise UnexpectedError(message)
def prepare_system_local(features, check_times, ignore_errors_for, just_configure):
"""Prepare local system for Kea development based on requested features."""
system, revision = get_system_revision()
log.info(f'Preparing deps for {system} {revision}...')
log.info('Preparing deps for %s %s...', system, revision)
if not just_configure:
install_packages_local(system, revision, features, check_times, ignore_errors_for)
@ -1659,7 +1646,7 @@ def prepare_system_local(features, check_times, ignore_errors_for, just_configur
_configure_mysql(system, revision, features)
if 'pgsql' in features:
_configure_pgsql(system, revision, features)
_configure_pgsql(system, features)
log.info('Preparing deps completed successfully.')
@ -2199,7 +2186,12 @@ def _build_binaries_and_run_ut(system, revision, features, tarball_path, env, ch
failures = int(root.get('failures'))
disabled = int(root.get('disabled'))
errors = int(root.get('errors'))
results[fn] = dict(total=total, failures=failures, disabled=disabled, errors=errors)
results[fn] = {
'total': total,
'failures': failures,
'disabled': disabled,
'errors': errors,
}
grand_total += total
grand_not_passed += failures + errors
@ -2221,7 +2213,7 @@ def _build_binaries_and_run_ut(system, revision, features, tarball_path, env, ch
result = green(result)
log.info('Unit test results: %s', result)
with open('unit-test-results.json', 'w') as f:
with open('unit-test-results.json', 'w', encoding='utf-8') as f:
f.write(json.dumps(results))
# store aggregated results in XML
@ -2255,7 +2247,7 @@ def _check_installed_rpm_or_debs(services_list):
def _build_rpm(system, revision, features, tarball_path, env, check_times, dry_run,
pkg_version, pkg_isc_version, repo_url):
pkg_version, pkg_isc_version):
# unpack kea sources tarball
_, arch = execute('arch', capture=True)
@ -2318,7 +2310,7 @@ def _build_rpm(system, revision, features, tarball_path, env, check_times, dry_r
def _build_deb(system, revision, features, tarball_path, env, check_times, dry_run,
pkg_version, pkg_isc_version, repository_url, repo_url):
pkg_version, pkg_isc_version, repo_url):
_, arch = execute('arch', capture=True)
if system == 'debian' and revision == '9':
@ -2329,15 +2321,15 @@ def _build_deb(system, revision, features, tarball_path, env, check_times, dry_r
_, output = execute("curl -o /dev/null -s -w '%{{http_code}}' {}/dists/kea/Release 2>/dev/null".format(repo_url),
capture=True)
http_code = output.rstrip()
release_file_exists = (http_code == '200')
release_file_exists = http_code == '200'
if release_file_exists:
log.info(f'{repo_url}/dists/kea/Release exists.')
log.info('%s/dists/kea/Release exists.', repo_url)
else:
repo_name = 'kea-%s-%s-%s' % (pkg_version.rsplit('.', 1)[0], system, revision)
log.error(f'{repo_url}/dists/kea/Release does not exist. '
f'This is usually caused by no package existing in {repo_name}. '
log.error('%s/dists/kea/Release does not exist. '
'This is usually caused by no package existing in %s. '
'You can solve this by uploading any package.'
'Continuing, but the build will likely fail.')
'Continuing, but the build will likely fail.', repo_url, repo_name)
# try apt update for up to 10 times if there is an error
for _ in range(10):
@ -2387,8 +2379,8 @@ def _build_deb(system, revision, features, tarball_path, env, check_times, dry_r
_check_installed_rpm_or_debs(services_list)
def _build_alpine_apk(system, revision, features, tarball_path, env, check_times, dry_run,
pkg_version, pkg_isc_version, repo_url):
def _build_alpine_apk(revision, features, tarball_path, check_times, dry_run,
pkg_version, pkg_isc_version):
_, arch = execute('arch', capture=True)
# unpack tarball
execute('sudo rm -rf kea-src packages', check_times=check_times, dry_run=dry_run)
@ -2444,21 +2436,21 @@ def _build_native_pkg(system, revision, features, tarball_path, env, check_times
# enable ccache if requested
env = _prepare_ccache_if_needed(system, ccache_dir, env)
repo_url = _get_full_repo_url(repository_url, system, revision, pkg_version)
repo_url = _get_full_repo_url(repository_url, system, revision)
if repo_url is None:
raise ValueError('repo_url is None')
if system in ['fedora', 'centos', 'rhel', 'rocky']:
_build_rpm(system, revision, features, tarball_path, env, check_times, dry_run,
pkg_version, pkg_isc_version, repo_url)
pkg_version, pkg_isc_version)
elif system in ['ubuntu', 'debian']:
_build_deb(system, revision, features, tarball_path, env, check_times, dry_run,
pkg_version, pkg_isc_version, repository_url, repo_url)
pkg_version, pkg_isc_version, repo_url)
elif system in ['alpine']:
_build_alpine_apk(system, revision, features, tarball_path, env, check_times, dry_run,
pkg_version, pkg_isc_version, repo_url)
_build_alpine_apk(revision, features, tarball_path, check_times, dry_run,
pkg_version, pkg_isc_version)
elif system in ['arch']:
pass
@ -2546,7 +2538,7 @@ def build_in_vagrant(provider, system, revision, features, leave_system, tarball
except ExecutionError as e:
error = e
msg = ' - ' + red(str(e))
except Exception as e: # pylint: disable=broad-except
except Exception as e:
log.exception('Building erred')
error = e
msg = ' - ' + red(str(e))
@ -2817,9 +2809,9 @@ def parse_args():
def list_supported_systems():
"""List systems hammer can support (with supported providers)."""
for system in SYSTEMS:
print('%s:' % system)
for release, supported in SYSTEMS[system].items():
for system, revision in SYSTEMS.items():
print(f'{system}:')
for release, supported in revision.items():
if not supported:
continue
providers = []
@ -2828,7 +2820,7 @@ def list_supported_systems():
if k in IMAGE_TEMPLATES:
providers.append(p)
providers = ', '.join(providers)
print(' - %s: %s' % (release, providers))
print(f' - {release}: {providers}')
def list_created_systems():
@ -2903,10 +2895,10 @@ def _get_features(args):
for i in args.with_randomly:
if _coin_toss():
features.add(i)
log.info(f'Feature enabled through coin toss: {i}')
log.info('Feature enabled through coin toss: %s', i)
else:
features.discard(i)
log.info(f'Feature disabled through coin toss: {i}')
log.info('Feature disabled through coin toss: %s', i)
if hasattr(args, 'ccache_dir') and args.ccache_dir:
features.add('ccache')
@ -2955,20 +2947,19 @@ def _print_summary(results, features):
def _check_system_revision(system, revision):
if revision == 'all':
return
if system not in SYSTEMS.keys():
if system not in SYSTEMS:
msg = "hammer.py error: argument -s/--system: invalid choice: '%s' (choose from '%s')"
msg = msg % (revision, "', '".join(SYSTEMS.keys()))
log.error(msg)
sys.exit(1)
revs = SYSTEMS[system].keys()
if revision not in revs:
if revision not in SYSTEMS[system]:
msg = "hammer.py error: argument -r/--revision: invalid choice: '%s' (choose from '%s')"
msg = msg % (revision, "', '".join(revs))
msg = msg % (revision, "', '".join(SYSTEMS[system].keys()))
log.error(msg)
sys.exit(1)
if not SYSTEMS[system][revision]:
log.warning(f'{system} ${revision} is no longer officially supported. '
'The script will continue in a best-effort manner.')
log.warning('%s %s is no longer officially supported. '
'The script will continue in a best-effort manner.', system, revision)
def _prepare_ccache_dir(ccache_dir, system, revision):
@ -3010,7 +3001,7 @@ def prepare_system_cmd(args):
def upload_to_repo(args, pkgs_dir):
# NOTE: note the differences (if any) in system/revision vs args.system/revision
system, revision = get_system_revision()
repo_url = _get_full_repo_url(args.repository_url, system, revision, args.pkg_version)
repo_url = _get_full_repo_url(args.repository_url, system, revision)
if repo_url is None:
raise ValueError('repo_url is None')
upload_cmd = 'curl -v --netrc -f'
@ -3055,7 +3046,7 @@ def upload_to_repo(args, pkgs_dir):
log.info("Asset already exists in the repository. Skipping upload.")
break
elif exitcode != 0:
raise Exception('Upload failed: %s' % output)
raise UnexpectedError('Upload failed: %s' % output)
else:
break

View File

@ -47,10 +47,10 @@ def send_to_control_agent(params):
# Issue: [B310:blacklist] Audit url open for permitted schemes.
# Allowing use of file:/ or custom schemes is often unexpected.
# Reason for nosec: url is checked to be http further above.
resp = urllib.request.urlopen(req, context=ssl_ctx) # nosec B310
with urllib.request.urlopen(req, context=ssl_ctx) as resp: # nosec B310
# Now get the response details, put it in CAResponse and return it
result = CAResponse(resp.getcode(), resp.reason,
resp.read().decode("utf-8"))
# Now get the response details, put it in CAResponse and return it
result = CAResponse(resp.getcode(), resp.reason,
resp.read().decode("utf-8"))
return result
return result
return None

View File

@ -166,8 +166,8 @@ class CARequestUnitTest(unittest.TestCase):
user = 'libert\xe9'
password = '\xe9galit\xe9'
else:
user = u'libert\xe9'
password = u'\xe9galit\xe9'
user = 'libert\xe9'
password = '\xe9galit\xe9'
buser = user.encode('utf-8')
bpassword = password.encode('utf-8')
secret = b':'.join((buser, bpassword))

View File

@ -28,7 +28,7 @@ if len(sys.argv) != 3:
preproc = re.compile('^#')
constant = re.compile('^([a-zA-Z].*?[a-zA-Z_0-9]+)\\s*=.*;')
with open(filename_in) as file_in, open(filename_out, "w") as file_out:
with open(filename_in, encoding='utf-8') as file_in, open(filename_out, "w", encoding='utf-8') as file_out:
file_out.write("// This file is generated from " + filename_in + "\n" +
"// by the const2hdr.py script.\n" +
"// Do not edit, all changes will be lost.\n\n")

View File

@ -316,14 +316,14 @@ What you are expected to do is as follows:
examples.
"""
import argparse
import base64
import configparser
import re
import time
import socket
import sys
import base64
import time
from datetime import datetime
from optparse import OptionParser
re_hex = re.compile(r'^0x[0-9a-fA-F]+')
re_decimal = re.compile(r'^\d+$')
@ -334,11 +334,9 @@ dnssec_timefmt = '%Y%m%d%H%M%S'
dict_qr = {'query': 0, 'response': 1}
dict_opcode = {'query': 0, 'iquery': 1, 'status': 2, 'notify': 4,
'update': 5}
rdict_opcode = dict([(dict_opcode[k], k.upper()) for k in dict_opcode.keys()])
dict_rcode = {'noerror': 0, 'formerr': 1, 'servfail': 2, 'nxdomain': 3,
'notimp': 4, 'refused': 5, 'yxdomain': 6, 'yxrrset': 7,
'nxrrset': 8, 'notauth': 9, 'notzone': 10}
rdict_rcode = dict([(dict_rcode[k], k.upper()) for k in dict_rcode.keys()])
dict_rrtype = {'none': 0, 'a': 1, 'ns': 2, 'md': 3, 'mf': 4, 'cname': 5,
'soa': 6, 'mb': 7, 'mg': 8, 'mr': 9, 'null': 10,
'wks': 11, 'ptr': 12, 'hinfo': 13, 'minfo': 14, 'mx': 15,
@ -352,21 +350,25 @@ dict_rrtype = {'none': 0, 'a': 1, 'ns': 2, 'md': 3, 'mf': 4, 'cname': 5,
'spf': 99, 'unspec': 103, 'tkey': 249, 'tsig': 250,
'dlv': 32769, 'ixfr': 251, 'axfr': 252, 'mailb': 253,
'maila': 254, 'any': 255, 'caa': 257}
rdict_rrtype = dict([(dict_rrtype[k], k.upper()) for k in dict_rrtype.keys()])
dict_rrclass = {'in': 1, 'ch': 3, 'hs': 4, 'any': 255}
rdict_rrclass = dict([(dict_rrclass[k], k.upper()) for k in dict_rrclass.keys()])
dict_algorithm = {'rsamd5': 1, 'dh': 2, 'dsa': 3, 'ecc': 4,
'rsasha1': 5}
dict_algorithm = {'rsamd5': 1, 'dh': 2, 'dsa': 3, 'ecc': 4, 'rsasha1': 5}
dict_nsec3_algorithm = {'reserved': 0, 'sha1': 1}
rdict_algorithm = dict([(dict_algorithm[k], k.upper()) for k in dict_algorithm.keys()])
rdict_nsec3_algorithm = dict([(dict_nsec3_algorithm[k], k.upper()) for k in dict_nsec3_algorithm.keys()])
rdict_opcode = {k.upper(): v for k, v in dict_opcode.items()}
rdict_rcode = {k.upper(): v for k, v in dict_rcode.items()}
rdict_rrtype = {k.upper(): v for k, v in dict_rrtype.items()}
rdict_rrclass = {k.upper(): v for k, v in dict_rrclass.items()}
rdict_algorithm = {k.upper(): v for k, v in dict_algorithm.items()}
rdict_nsec3_algorithm = {k.upper(): v for k, v in dict_nsec3_algorithm.items()}
header_xtables = {'qr': dict_qr, 'opcode': dict_opcode,
'rcode': dict_rcode}
question_xtables = {'rrtype': dict_rrtype, 'rrclass': dict_rrclass}
def parse_value(value, xtable={}):
def parse_value(value, xtable=None):
if xtable is None:
xtable = {}
if re.search(re_hex, value):
return int(value, 16)
if re.search(re_decimal, value):
@ -380,9 +382,9 @@ def parse_value(value, xtable={}):
return value
def code_totext(code, dict):
if code in dict.keys():
return dict[code] + '(' + str(code) + ')'
def code_totext(code, dictionary):
if code in dictionary:
return dictionary[code] + '(' + str(code) + ')'
return str(code)
@ -395,7 +397,7 @@ def encode_name(name, absolute=True):
for label in labels:
if len(label) > 4 and label[0:4] == 'ptr=':
# special meta-syntax for compression pointer
wire += '%04x' % (0xc000 | int(l[4:]))
wire += '%04x' % (0xc000 | int(label[4:]))
break
if absolute or len(label) > 0:
wire += '%02x' % len(label)
@ -405,15 +407,15 @@ def encode_name(name, absolute=True):
return wire
def encode_string(name, len=None):
if type(name) is int and len is not None:
return '%0.*x' % (len * 2, name)
def encode_string(name, length=None):
if isinstance(name, int) and length is not None:
return '%0.*x' % (length * 2, name)
return ''.join(['%02x' % ord(ch) for ch in name])
def encode_bytes(name, len=None):
if type(name) is int and len is not None:
return '%0.*x' % (len * 2, name)
def encode_bytes(name, length=None):
if isinstance(name, int) and length is not None:
return '%0.*x' % (length * 2, name)
return ''.join(['%02x' % ch for ch in name])
@ -426,11 +428,13 @@ def count_namelabels(name):
return len(name.split('.'))
def get_config(config, section, configobj, xtables={}):
def get_config(config, section, configobj, xtables=None):
if xtables is None:
xtables = {}
try:
for field in config.options(section):
value = config.get(section, field)
if field in xtables.keys():
if field in xtables:
xtable = xtables[field]
else:
xtable = {}
@ -704,7 +708,8 @@ class AAAA(RR):
self.dump_header(f, self.rdlen)
f.write('# Address=%s\n' % (self.address))
bin_address = socket.inet_pton(socket.AF_INET6, self.address)
[f.write('%02x' % x) for x in bin_address]
for x in bin_address:
f.write('%02x' % x)
f.write('\n')
@ -959,6 +964,7 @@ class NSECBASE(RR):
block = 0
maplen = None # default bitmap length, auto-calculate
bitmap = '040000000003' # an arbitrarily chosen bitmap sample
nextname = None
def dump(self, f):
# first, construct the bitmap data
@ -994,6 +1000,17 @@ class NSECBASE(RR):
f.write('%02x %02x %s\n' %
(block_list[i], maplen_list[i], bitmap_list[i]))
def dump_fixedpart(self, f, bitmap_totallen):
name_wire = encode_name(self.nextname)
if self.rdlen is None:
# if rdlen needs to be calculated, it must be based on the bitmap
# length, because the configured maplen can be fake.
self.rdlen = int(len(name_wire) / 2) + bitmap_totallen
self.dump_header(f, self.rdlen)
f.write('# Next Name=%s (%d bytes)\n' % (self.nextname,
int(len(name_wire) / 2)))
f.write('%s\n' % name_wire)
class NSEC(NSECBASE):
'''Implements rendering NSEC RDATA in the test data format.
@ -1007,17 +1024,6 @@ class NSEC(NSECBASE):
nextname = 'next.example.com'
def dump_fixedpart(self, f, bitmap_totallen):
name_wire = encode_name(self.nextname)
if self.rdlen is None:
# if rdlen needs to be calculated, it must be based on the bitmap
# length, because the configured maplen can be fake.
self.rdlen = int(len(name_wire) / 2) + bitmap_totallen
self.dump_header(f, self.rdlen)
f.write('# Next Name=%s (%d bytes)\n' % (self.nextname,
int(len(name_wire) / 2)))
f.write('%s\n' % name_wire)
class NSEC3PARAM(RR):
'''Implements rendering NSEC3PARAM RDATA in the test data format.
@ -1146,9 +1152,9 @@ class RRSIG(RR):
self.rdlen = int(18 + len(name_wire) / 2 + len(str(sig_wire)) / 2)
self.dump_header(f, self.rdlen)
if type(self.covered) is str:
if isinstance(self.covered, str):
self.covered = dict_rrtype[self.covered.lower()]
if type(self.algorithm) is str:
if isinstance(self.algorithm, str):
self.algorithm = dict_algorithm[self.algorithm.lower()]
if self.labels is None:
self.labels = count_namelabels(self.signer)
@ -1345,7 +1351,7 @@ class TSIG(RR):
name_wire = encode_name(self.algorithm)
mac_size = self.mac_size
if mac_size is None:
if self.algorithm in self.dict_macsize.keys():
if self.algorithm in self.dict_macsize:
mac_size = self.dict_macsize[self.algorithm]
else:
raise RuntimeError('TSIG Mac Size cannot be determined')
@ -1426,7 +1432,7 @@ config_param = {'name': (Name, {}),
'header': (DNSHeader, header_xtables),
'question': (DNSQuestion, question_xtables),
'edns': (EDNS, {})}
for rrtype in dict_rrtype.keys():
for rrtype in dict_rrtype:
# For any supported RR types add the tuple of (RR_CLASS, {}).
# We expect KeyError as not all the types are supported, and simply
# ignore them.
@ -1448,18 +1454,18 @@ def get_config_param(section):
usage = 'usage: %prog [options] input_file'
if __name__ == "__main__":
parser = OptionParser(usage=usage)
parser.add_option('-o', '--output', action='store', dest='output',
default=None, metavar='FILE',
help='output file name [default: prefix of input_file]')
(options, args) = parser.parse_args()
def main():
parser = argparse.ArgumentParser(usage=usage)
parser.add_argument('-o', '--output', action='store', dest='output',
default=None, metavar='FILE',
help='output file name [default: prefix of input_file]')
args = parser.parse_args()
if len(args) == 0:
parser.error('input file is missing')
configfile = args[0]
outputfile = options.output
outputfile = args.output
if not outputfile:
m = re.match(r'(.*)\.[^.]+$', configfile)
if m:
@ -1468,12 +1474,12 @@ if __name__ == "__main__":
raise ValueError('output file is not specified and input file is not in the form of "output_file.suffix"')
# DeprecationWarning: use ConfigParser directly
config = configparser.SafeConfigParser()
config = configparser.SafeConfigParser() # pylint: disable=deprecated-class
config.read(configfile)
output = open(outputfile, 'w')
output = open(outputfile, 'w', encoding='utf-8') # pylint: disable=consider-using-with
print_header(output, configfile)
print_header(outputfile, configfile)
# First try the 'custom' mode; if it fails assume the query mode.
try:
@ -1488,3 +1494,7 @@ if __name__ == "__main__":
obj.dump(output)
output.close()
if __name__ == "__main__":
main()

View File

@ -305,9 +305,9 @@ def main(parameters):
# Only print if we have something to print to avoid a newline.
# Also don't clutter output with lines that doesn't cause CI failure if
# there are lines that cause CI failure.
if len(output_for_latest):
if len(output_for_latest) > 0:
print(output_for_latest)
elif len(output_for_other_than_latest):
elif len(output_for_other_than_latest) > 0:
print(output_for_other_than_latest)
# Only report errors on the latest upgrade script. For all other upgrade

View File

@ -15,7 +15,7 @@
# git pull
# git remote prune origin
#
# This script requires python 2.7 or 3.
# This script requires python 3.
#
# I have limited experience in Python. If things are done in a strange or
# uncommon way, there are no obscure reasons to do it that way, just plain
@ -23,9 +23,7 @@
#
# tomek
import string
import sys
from optparse import OptionParser
import argparse
# [B404:blacklist] Consider possible security implications associated with subprocess module.
import subprocess # nosec B404
@ -157,39 +155,31 @@ def check_output(cmd):
return subprocess.check_output(cmd) # nosec B603
def parse_args(args=sys.argv[1:], Parser=OptionParser):
def parse_args():
parser = argparse.ArgumentParser(
description="This script prints out merged and/or unmerged branches of a GIT tree.",
usage="""%prog
Lists all obsolete (fully merged into master) branches.
""")
parser = Parser(description="This script prints out merged and/or unmerged"
" branches of a GIT tree.")
parser.add_argument("-c", "--csv", action="store_true",
default=False, help="generates CSV output")
parser.add_argument("-u", "--unmerged", action="store_true",
default=False, help="lists unmerged branches")
parser.add_argument("-m", "--skip-merged", action="store_true",
default=False, help="omits listing merged branches")
parser.add_argument("-s", "--stats", action="store_true",
default=False, help="prints also statistics")
parser.add_option("-c", "--csv", action="store_true",
default=False, help="generates CSV output")
parser.add_option("-u", "--unmerged", action="store_true",
default=False, help="lists unmerged branches")
parser.add_option("-m", "--skip-merged", action="store_true",
default=False, help="omits listing merged branches")
parser.add_option("-s", "--stats", action="store_true",
default=False, help="prints also statistics")
(options, args) = parser.parse_args(args)
if args:
parser.print_help()
sys.exit(1)
return options
return parser.parse_args()
def main():
usage = """%prog
Lists all obsolete (fully merged into master) branches.
"""
options = parse_args()
csv = options.csv
merged = not options.skip_merged
unmerged = options.unmerged
stats = options.stats
args = parse_args()
csv = args.csv
merged = not args.skip_merged
unmerged = args.unmerged
stats = args.stats
if csv:
print("branch name,status,date,last commit(mail),last commit(name)")

View File

@ -1,14 +1,14 @@
#!/usr/bin/env python3
import argparse
from termcolor import colored, cprint
from io import StringIO
import json
import os
import re
import sqlalchemy as db
from sqlalchemy.sql import select
import sys
from io import StringIO
import sqlalchemy as db # pylint: disable=import-error
from termcolor import cprint # pylint: disable=import-error
def convert_to_db(entity_name, make_singular=True):
@ -86,6 +86,7 @@ class State:
class ConfigFile:
def __init__(self, filename):
self.config = {}
self.filename = filename
def load(self):
@ -93,7 +94,7 @@ class ConfigFile:
print('The all keys file %s does not exist.' % self.filename)
sys.exit(1)
with open(self.filename) as f:
with open(self.filename, encoding='utf-8') as f:
self.config = json.load(f)
f.close()
@ -232,17 +233,17 @@ def main():
sys.exit(1)
sanitized_contents = ''
f = open(args.all_keys_file)
for line in f:
sanitized_line = line.strip()
if not sanitized_line:
continue
with open(args.all_keys_file, encoding='utf-8') as f:
for line in f:
sanitized_line = line.strip()
if not sanitized_line:
continue
if sanitized_line.find('//') != -1 or sanitized_line.find('#') != -1:
continue
if sanitized_line.find('//') != -1 or sanitized_line.find('#') != -1:
continue
sanitized_line = sanitized_line.replace(': .', ': 0.')
sanitized_contents = sanitized_contents + sanitized_line
sanitized_line = sanitized_line.replace(': .', ': 0.')
sanitized_contents = sanitized_contents + sanitized_line
f.close()

View File

@ -160,7 +160,8 @@ def process_file(filename):
Parameters:
filename Name of the message file to process
"""
lines = open(filename).read().splitlines()
with open(filename, encoding='utf-8') as f:
lines = f.read().splitlines()
# Search for the first line starting with the percent character. Everything
# before it is considered the file header and is copied to the output with