mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-08-22 18:08:16 +00:00
339 lines
14 KiB
Python
Executable File
339 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright (C) 2024 Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
import argparse
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
|
|
|
|
USAGE = """
|
|
This script does several verifications regarding logged messages:
|
|
1. Checks that messages are logged only once (outside of an exhonerated list).
|
|
2. Checks that no two messages share the same id.
|
|
3. Checks that there are no unlogged/unused messages.
|
|
4. Removes all occurences of unused messages (when run with -a).
|
|
5. Checks that the debug log level is correctly logged in the message documentation.
|
|
6. Automatically adds or fixes the debug log level in the message documentation (when run with -a).
|
|
7. Checks that the placeholder ids are consecutive, starting with 1, and unique in the same message definition.
|
|
"""
|
|
|
|
|
|
def check_duplicate_occurences(occurences):
|
|
exhonerated = {}
|
|
parent_dir = os.path.dirname(os.path.realpath(os.path.abspath(sys.argv[0])))
|
|
for exh_txt in [
|
|
f'{parent_dir}/exhonerated-duplicate-messages.txt',
|
|
f'{parent_dir}/../premium/tools/exhonerated-duplicate-messages.txt',
|
|
]:
|
|
if pathlib.Path(exh_txt).is_file():
|
|
with open(exh_txt, 'r', encoding='utf-8') as f:
|
|
lines = f.read().splitlines()
|
|
for line in lines:
|
|
message_id = line.split('%')[1].split(':')[0].strip()
|
|
max_allowed = line.split(':')[1].strip()
|
|
exhonerated[message_id] = int(max_allowed)
|
|
|
|
failure = False
|
|
duplicate_occurences = {k: v for k, v in occurences.items() if v > 1}
|
|
for k, v in duplicate_occurences.items():
|
|
if k in exhonerated and v <= exhonerated[k]:
|
|
continue
|
|
if not failure: # in other words: if first
|
|
print('Duplicate occurences found:')
|
|
failure = True
|
|
print(f' % {k}: {v}')
|
|
return failure
|
|
|
|
|
|
def check_unlogged_messages(messages, autofix):
|
|
all_source_files = set(pathlib.Path('.').glob('**/*.cc')) \
|
|
- set(pathlib.Path('.').glob('**/*messages.cc')) \
|
|
| set(pathlib.Path('.').glob('**/*.h')) \
|
|
- set(pathlib.Path('.').glob('**/*messages.h'))
|
|
all_source_code = ''
|
|
for file in all_source_files:
|
|
with open(file, 'r', encoding='utf-8') as f:
|
|
all_source_code += f.read()
|
|
failure = False
|
|
for message_id in messages:
|
|
if message_id not in all_source_code:
|
|
if not failure: # in other words: if first
|
|
print('Unlogged messages found:')
|
|
failure = True
|
|
print(f' % {message_id}')
|
|
if autofix:
|
|
remove_message_definition(message_id, messages[message_id]['file'])
|
|
print(' ^ autofixed')
|
|
return failure
|
|
|
|
|
|
# This function is deprecated. Replaced by check_unlogged_messages.
|
|
# Messages can appear outside LOG_* function calls.
|
|
# So checking occurences is not enough.
|
|
def check_unlogged_messages_based_on_occurences(messages, occurences, autofix):
|
|
failure = False
|
|
for message_id in messages:
|
|
if message_id not in occurences:
|
|
if not failure: # in other words: if first
|
|
print('Unlogged messages found:')
|
|
failure = True
|
|
print(f' % {message_id}')
|
|
if autofix:
|
|
remove_message_definition(message_id, messages[message_id]['file'])
|
|
print(' ^ autofixed')
|
|
return failure
|
|
|
|
|
|
def check_that_debug_log_levels_are_documented(messages, debug_levels, log_lines, autofix):
|
|
failure = False
|
|
for message_id, message in messages.items():
|
|
log_level = None
|
|
for line in log_lines:
|
|
if line.startswith('LOG_DEBUG') and message_id in line:
|
|
log_level = line.split(',')[1].strip().replace('isc::log::', '').replace('log::', '')
|
|
break
|
|
if log_level is None:
|
|
continue
|
|
if not log_level.isdigit():
|
|
log_level = debug_levels[log_level]
|
|
if isinstance(log_level, str):
|
|
log_level = int(log_level)
|
|
if not isinstance(log_level, int):
|
|
print(f'Could not determine numerical log level of {message_id}. Supposedly {log_level}?')
|
|
failure = True
|
|
if message['debug_log_level_line'] != f'Logged at debug log level {log_level}.':
|
|
if not failure: # in other words: if first
|
|
print('Messages that do not document their debug log levels:')
|
|
failure = True
|
|
print(f' % {message_id}: {message["debug_log_level_line"]}')
|
|
print(f' % It should be: Logged at debug log level {log_level}.')
|
|
if autofix:
|
|
file = message['file']
|
|
# If line is already there, remove it.
|
|
if message['debug_log_level_line'].startswith('Logged at debug log level '):
|
|
line_number = run(fr'grep -En "^% \b{message_id}\b" "{file}" | cut -d ":" -f 1')
|
|
line_number = int(line_number) + 1
|
|
run(f'sed "{line_number}d" "{file}" > "{file}.tmp"')
|
|
run(f'mv "{file}.tmp" "{file}"')
|
|
# And add the right one.
|
|
run(f'sed "/^% {message_id} /a Logged at debug log level {log_level}." "{file}" > "{file}.tmp"')
|
|
run(f'mv "{file}.tmp" "{file}"')
|
|
print(' ^ autofixed')
|
|
return failure
|
|
|
|
|
|
def check_placeholder_ids(messages):
|
|
failure = False
|
|
placeholder_id_pattern = re.compile('(%[0-9]+)')
|
|
for message_id in messages:
|
|
text = messages[message_id]['text']
|
|
matches = placeholder_id_pattern.search(text)
|
|
if matches is not None:
|
|
for i in range(len(matches.groups())):
|
|
match = matches.group(i)
|
|
if match != f'%{i + 1}':
|
|
print(f'Expected %{i + 1} but found {match} for message:')
|
|
print(f' % {message_id} {text}')
|
|
failure = True
|
|
return failure
|
|
|
|
|
|
def remove_message_definition(message, file):
|
|
new_lines = []
|
|
removing = False
|
|
with open(file, 'r', encoding='utf-8') as f:
|
|
lines = f.read().splitlines()
|
|
for line in lines:
|
|
if line.startswith(f'% {message}'):
|
|
removing = True
|
|
elif not removing:
|
|
new_lines.append(line)
|
|
elif len(line) == 0:
|
|
removing = False
|
|
with open(file, 'w', encoding='utf-8') as f:
|
|
for i in new_lines:
|
|
f.write(i)
|
|
f.write('\n')
|
|
|
|
|
|
def run(command):
|
|
''' Executes a shell command and returns its output.
|
|
|
|
:param command: the command to be executed
|
|
:type command: str
|
|
|
|
:return: the standard output from the command
|
|
:type: str
|
|
'''
|
|
if 'DEBUG' in os.environ:
|
|
print(f'> {command}')
|
|
# Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified, security
|
|
# issue.
|
|
with subprocess.Popen(command, encoding='utf-8', shell=True, # nosec B602
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
output, error = p.communicate()
|
|
if error:
|
|
print('ERROR:', error, file=sys.stderr)
|
|
sys.exit(2)
|
|
return output.strip()
|
|
|
|
|
|
def main():
|
|
# Parse parameters.
|
|
parser = argparse.ArgumentParser(description=USAGE,
|
|
formatter_class=argparse.RawTextHelpFormatter)
|
|
parser.add_argument('-a', '--autofix', action='store_true',
|
|
help='Autofix unused messages and debug log levels in docs.')
|
|
args = parser.parse_args()
|
|
|
|
# Initializations
|
|
failure = False
|
|
debug_levels = {}
|
|
log_lines = []
|
|
messages = {}
|
|
occurences = {}
|
|
debug_level_pattern = re.compile(r'^(extern |)const int (.*DBG.*) =(.*)$')
|
|
message_id_pattern = re.compile(r'^% (\w+) (.*)')
|
|
log_pattern = re.compile(r'\b(LOG_DEBUG|LOG_ERROR|LOG_FATAL|LOG_INFO|LOG_WARN)\(')
|
|
|
|
# Process .mes files.
|
|
mes_files = sorted(pathlib.Path('.').glob('**/*.mes'))
|
|
for mes_file in mes_files:
|
|
with open(mes_file, 'r', encoding='utf-8') as f:
|
|
current_message_id = None
|
|
lines = f.read().splitlines()
|
|
for line in lines:
|
|
if len(line) == 0:
|
|
current_message_id = None
|
|
|
|
message_id_matches = message_id_pattern.search(line)
|
|
if message_id_matches is None:
|
|
# Could be message description.
|
|
if current_message_id is not None:
|
|
if 'debug_log_level_line' in messages[current_message_id]:
|
|
# If debug log level line is already there, that's all that concerns us, so unset
|
|
# current_message_id so that we ignore future lines.
|
|
current_message_id = None
|
|
else:
|
|
messages[current_message_id]['debug_log_level_line'] = line
|
|
else:
|
|
# Message definition
|
|
message_id = message_id_matches.group(1)
|
|
message_text = message_id_matches.group(2)
|
|
|
|
# 2. Checks that no two messages share the same id.
|
|
if message_id in messages:
|
|
print(f'Duplicate message id definition: {message_id}. Check in both core and premium.')
|
|
failure = True
|
|
|
|
current_message_id = message_id
|
|
messages[message_id] = {
|
|
'file': mes_file,
|
|
'text': message_text,
|
|
}
|
|
|
|
# Process .cc and .h files.
|
|
cc_files = sorted(pathlib.Path('.').glob('**/*.cc'))
|
|
h_files = sorted(pathlib.Path('.').glob('**/*.h'))
|
|
cpp_files = cc_files + h_files
|
|
for cpp_file in cpp_files:
|
|
# Skip test files.
|
|
if any(i in cpp_file.parts for i in ['tests', 'testutils', 'unittests']):
|
|
continue
|
|
|
|
with open(cpp_file, 'r', encoding='utf-8') as f:
|
|
lines = f.read().splitlines()
|
|
current_log_line = ''
|
|
current_debug_level = ''
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
if len(current_debug_level) != 0:
|
|
debug_levels[current_debug_level] = (line.strip().rstrip(';')
|
|
.replace('isc::log::', '').replace('log::', ''))
|
|
current_debug_level = ''
|
|
|
|
matches = debug_level_pattern.search(line)
|
|
if matches is not None:
|
|
level = matches.group(2)
|
|
reference = matches.group(3)
|
|
if level is not None:
|
|
if len(reference) == 0:
|
|
current_debug_level = level
|
|
else:
|
|
debug_levels[level] = (reference.strip().rstrip(';')
|
|
.replace('isc::log::', '').replace('log::', ''))
|
|
|
|
if len(current_log_line) == 0:
|
|
matches = log_pattern.search(line)
|
|
if matches is not None:
|
|
if not line.startswith('//'):
|
|
current_log_line = line
|
|
else:
|
|
continue
|
|
else:
|
|
current_log_line += line.strip()
|
|
if current_log_line.endswith(';'):
|
|
log_lines.append(current_log_line)
|
|
current_log_line = ''
|
|
log_lines = sorted(log_lines)
|
|
|
|
# Resolve all debug_levels to numbers.
|
|
finished = False
|
|
while not finished:
|
|
finished = True
|
|
for level, reference in debug_levels.items():
|
|
if reference.isdigit():
|
|
continue
|
|
for i in reference.split(' '):
|
|
if i in debug_levels:
|
|
if debug_levels[i].isdigit():
|
|
debug_levels[level] = debug_levels[level].replace(i, debug_levels[i])
|
|
finished = False
|
|
if finished: # in other words, if no replacement was done, so if all replacements were already done, then:
|
|
debug_levels[level] = eval(reference) # pylint: disable=eval-used
|
|
for level in debug_levels:
|
|
debug_levels[level] = int(debug_levels[level])
|
|
|
|
# Get number of occurences for each message id.
|
|
for line in log_lines:
|
|
pos = 1
|
|
if line.split('(')[0] == 'LOG_DEBUG':
|
|
pos = 2
|
|
message_id = line.split(',')[pos]
|
|
message_id = message_id.split(')')[0]
|
|
message_id = message_id.strip()
|
|
if message_id in occurences:
|
|
occurences[message_id] += 1
|
|
else:
|
|
occurences[message_id] = 1
|
|
|
|
# 1. Checks that messages are logged only once.
|
|
failure |= check_duplicate_occurences(occurences)
|
|
|
|
# 3. Checks that there are no unlogged/unused messages.
|
|
# 4. Removes all occurences of unused messages (when run with -a).
|
|
failure |= check_unlogged_messages(messages, args.autofix)
|
|
|
|
# 5. Checks that the debug log level is correctly logged in the message documentation.
|
|
# 6. Automatically adds or fixes the debug log level in the message documentation (when run with -a).
|
|
failure |= check_that_debug_log_levels_are_documented(messages, debug_levels, log_lines, args.autofix)
|
|
|
|
# 7. Checks that the placeholder ids are consecutive, starting with 1, and unique in the same message definition.
|
|
failure |= check_placeholder_ids(messages)
|
|
|
|
if failure:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|