2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-31 14:25:52 +00:00

Format aa-notify to follow PEP-8

Update (most of the) code and inline comments/docstrings to follow
https://peps.python.org/pep-0008/ so that future maintenance is slightly
easier.

Continue to keep long lines as splitting them does not always improve
the code readability.
This commit is contained in:
Otto Kekäläinen
2023-03-26 11:26:59 -07:00
parent c1a1a3a923
commit fff72ed4c4

View File

@@ -1,6 +1,6 @@
#! /usr/bin/python3
# ----------------------------------------------------------------------
# Copyright (C) 20182019 Otto Kekäläinen <otto@kekalainen.net>
# Copyright (C) 20182022 Otto Kekäläinen <otto@kekalainen.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
@@ -27,6 +27,8 @@
# In a typical desktop environment one would run as a service the
# command:
# /usr/bin/aa-notify -p -w 10
#
"""Show AppArmor events on command line or as desktop notifications."""
import argparse
import atexit
@@ -50,9 +52,13 @@ from apparmor.notify import get_last_login_timestamp
from apparmor.translations import init_translation
def get_user_login():
"""Portable function to get username. Should not trigger any
"OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI"""
"""Portable function to get username.
Should not trigger any
"OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI.
"""
if os.name == "posix":
username = pwd.getpwuid(os.geteuid()).pw_name
else:
@@ -63,6 +69,7 @@ def get_user_login():
def format_event(event, logsource):
"""Generate the notification text contents."""
output = []
if 'message_body' in config['']:
@@ -86,6 +93,7 @@ def format_event(event, logsource):
def notify_about_new_entries(logfile, wait=0):
"""Run the notification daemon in the background."""
# Kill other instances of aa-notify if already running
for process in psutil.process_iter():
# Find the process that has the same name as this script, e.g. aa-notify.py
@@ -102,7 +110,7 @@ def notify_about_new_entries(logfile, wait=0):
try:
for event in follow_apparmor_events(logfile, wait):
debug_logger.info(format_event(event, logfile))
yield(format_event(event, logfile))
yield (format_event(event, logfile))
except PermissionError:
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
@@ -114,6 +122,7 @@ def notify_about_new_entries(logfile, wait=0):
def show_entries_since_epoch(logfile, epoch_since):
"""Show AppArmor notifications since given timestamp."""
count = 0
for event in get_apparmor_events(logfile, epoch_since):
count += 1
@@ -137,6 +146,7 @@ def show_entries_since_epoch(logfile, epoch_since):
def show_entries_since_last_login(logfile, username=get_user_login()):
"""Show AppArmor notifications since last login of user."""
# If running as sudo, use username of sudo user instead of root
if 'SUDO_USER' in os.environ.keys():
username = os.environ['SUDO_USER']
@@ -152,15 +162,15 @@ def show_entries_since_last_login(logfile, username=get_user_login()):
def show_entries_since_days(logfile, since_days):
day_in_seconds = 60*60*24
"""Show AppArmor notifications since the given amount of days."""
day_in_seconds = 60 * 60 * 24
epoch_now = int(time.time())
epoch_since = epoch_now - day_in_seconds * since_days
show_entries_since_epoch(logfile, epoch_since)
def follow_apparmor_events(logfile, wait=0):
"""Follow AppArmor events and yield relevant entries until process stops"""
"""Follow AppArmor events and yield relevant entries until process stops."""
# If wait was given as argument but was type None (from ArgumentParser)
# ensure it's type int and zero
if not wait:
@@ -239,7 +249,7 @@ def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size):
def get_apparmor_events(logfile, since=0):
"""Read audit events from log source and yield all relevant events"""
"""Read audit events from log source and yield all relevant events."""
# Get logdata from file
# @TODO Implement more log sources in addition to just the logfile
@@ -253,11 +263,19 @@ def get_apparmor_events(logfile, since=0):
def parse_logdata(logsource):
"""Traverse any iterable log source and extract relevant AppArmor events"""
"""Traverse any iterable log source and extract relevant AppArmor events.
Expects log lines like:
Feb 16 20:22:28 XPS-13-9370 kernel: [520374.926882] audit: type=1400
audit(1581877348.868:657): apparmor="ALLOWED" operation="open"
profile="libreoffice-soffice"
name="/usr/share/drirc.d/00-mesa-defaults.conf" pid=22690
comm="soffice.bin" requested_mask="r" denied_mask="r" fsuid=1001 ouid=0
"""
re_audit_time_id = r'(msg=)?audit\([\d\.\:]+\):\s+' # 'audit(1282626827.320:411): '
re_kernel_time = r'\[[\d\.\s]+\]' # '[ 1612.746129]'
re_type_num = '1[45][0-9][0-9]' # 1400..1599
re_type_num = '1[45][0-9][0-9]' # 1400..1599
re_aa_or_op = '(apparmor=|operation=)'
re_log_parts = [
@@ -287,8 +305,11 @@ def parse_logdata(logsource):
def drop_privileges():
"""If running as root, drop privileges to USER if known, or fall-back to nobody_user/group"""
"""Drop privileges of process.
If running as root, drop privileges to USER if known, or fall-back to
nobody_user/group.
"""
if os.geteuid() == 0:
if 'SUDO_USER' in os.environ.keys():
@@ -315,8 +336,10 @@ def drop_privileges():
def raise_privileges():
"""If was running as user with saved user ID 0, raise back to root privileges"""
"""Raise privileges of process.
If was running as user with saved user ID 0, raise back to root privileges.
"""
if os.geteuid() != 0 and original_effective_user == 0:
debug_logger.debug('Rasing privileges from UID {} back to UID 0 (root)'.format(os.geteuid()))
@@ -326,6 +349,7 @@ def raise_privileges():
def read_notify_conf(path, shell_config):
"""Read notify.conf."""
try:
shell_config.CONF_DIR = path
conf_dict = shell_config.read_config('notify.conf')
@@ -336,11 +360,10 @@ def read_notify_conf(path, shell_config):
def main():
"""
Main function of aa-notify that parses command line
arguments and starts the requested operations.
"""
"""Run aa-notify.
Parse command line arguments and starts the requested operations.
"""
global _, debug_logger, config, args
global debug_docs_url, nobody_user, original_effective_user, timeformat
@@ -452,7 +475,9 @@ def main():
'message_footer'
]
found_config_keys = config[''].keys()
unknown_keys = [item for item in found_config_keys if item not in allowed_config_keys]
unknown_keys = [
item for item in found_config_keys if item not in allowed_config_keys
]
for item in unknown_keys:
print(_('Warning! Configuration item "{}" is unknown!').format(item))