diff --git a/utils/aa-notify b/utils/aa-notify index a4db7c9e2..d2394ccef 100755 --- a/utils/aa-notify +++ b/utils/aa-notify @@ -1,6 +1,6 @@ #! /usr/bin/python3 # ---------------------------------------------------------------------- -# Copyright (C) 2018–2019 Otto Kekäläinen +# Copyright (C) 2018–2022 Otto Kekäläinen # # This program is free software; you can redistribute it and/or # modify it under the terms of version 2 of the GNU General Public @@ -27,6 +27,8 @@ # In a typical desktop environment one would run as a service the # command: # /usr/bin/aa-notify -p -w 10 +# +"""Show AppArmor events on command line or as desktop notifications.""" import argparse import atexit @@ -50,9 +52,13 @@ from apparmor.notify import get_last_login_timestamp from apparmor.translations import init_translation + def get_user_login(): - """Portable function to get username. Should not trigger any - "OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI""" + """Portable function to get username. + + Should not trigger any + "OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI. + """ if os.name == "posix": username = pwd.getpwuid(os.geteuid()).pw_name else: @@ -63,6 +69,7 @@ def get_user_login(): def format_event(event, logsource): + """Generate the notification text contents.""" output = [] if 'message_body' in config['']: @@ -86,6 +93,7 @@ def format_event(event, logsource): def notify_about_new_entries(logfile, wait=0): + """Run the notification daemon in the background.""" # Kill other instances of aa-notify if already running for process in psutil.process_iter(): # Find the process that has the same name as this script, e.g. aa-notify.py @@ -102,7 +110,7 @@ def notify_about_new_entries(logfile, wait=0): try: for event in follow_apparmor_events(logfile, wait): debug_logger.info(format_event(event, logfile)) - yield(format_event(event, logfile)) + yield (format_event(event, logfile)) except PermissionError: sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile))) @@ -114,6 +122,7 @@ def notify_about_new_entries(logfile, wait=0): def show_entries_since_epoch(logfile, epoch_since): + """Show AppArmor notifications since given timestamp.""" count = 0 for event in get_apparmor_events(logfile, epoch_since): count += 1 @@ -137,6 +146,7 @@ def show_entries_since_epoch(logfile, epoch_since): def show_entries_since_last_login(logfile, username=get_user_login()): + """Show AppArmor notifications since last login of user.""" # If running as sudo, use username of sudo user instead of root if 'SUDO_USER' in os.environ.keys(): username = os.environ['SUDO_USER'] @@ -152,15 +162,15 @@ def show_entries_since_last_login(logfile, username=get_user_login()): def show_entries_since_days(logfile, since_days): - day_in_seconds = 60*60*24 + """Show AppArmor notifications since the given amount of days.""" + day_in_seconds = 60 * 60 * 24 epoch_now = int(time.time()) epoch_since = epoch_now - day_in_seconds * since_days show_entries_since_epoch(logfile, epoch_since) def follow_apparmor_events(logfile, wait=0): - """Follow AppArmor events and yield relevant entries until process stops""" - + """Follow AppArmor events and yield relevant entries until process stops.""" # If wait was given as argument but was type None (from ArgumentParser) # ensure it's type int and zero if not wait: @@ -239,7 +249,7 @@ def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size): def get_apparmor_events(logfile, since=0): - """Read audit events from log source and yield all relevant events""" + """Read audit events from log source and yield all relevant events.""" # Get logdata from file # @TODO Implement more log sources in addition to just the logfile @@ -253,11 +263,19 @@ def get_apparmor_events(logfile, since=0): def parse_logdata(logsource): - """Traverse any iterable log source and extract relevant AppArmor events""" + """Traverse any iterable log source and extract relevant AppArmor events. + + Expects log lines like: + Feb 16 20:22:28 XPS-13-9370 kernel: [520374.926882] audit: type=1400 + audit(1581877348.868:657): apparmor="ALLOWED" operation="open" + profile="libreoffice-soffice" + name="/usr/share/drirc.d/00-mesa-defaults.conf" pid=22690 + comm="soffice.bin" requested_mask="r" denied_mask="r" fsuid=1001 ouid=0 + """ re_audit_time_id = r'(msg=)?audit\([\d\.\:]+\):\s+' # 'audit(1282626827.320:411): ' re_kernel_time = r'\[[\d\.\s]+\]' # '[ 1612.746129]' - re_type_num = '1[45][0-9][0-9]' # 1400..1599 + re_type_num = '1[45][0-9][0-9]' # 1400..1599 re_aa_or_op = '(apparmor=|operation=)' re_log_parts = [ @@ -287,8 +305,11 @@ def parse_logdata(logsource): def drop_privileges(): - """If running as root, drop privileges to USER if known, or fall-back to nobody_user/group""" + """Drop privileges of process. + If running as root, drop privileges to USER if known, or fall-back to + nobody_user/group. + """ if os.geteuid() == 0: if 'SUDO_USER' in os.environ.keys(): @@ -315,8 +336,10 @@ def drop_privileges(): def raise_privileges(): - """If was running as user with saved user ID 0, raise back to root privileges""" + """Raise privileges of process. + If was running as user with saved user ID 0, raise back to root privileges. + """ if os.geteuid() != 0 and original_effective_user == 0: debug_logger.debug('Rasing privileges from UID {} back to UID 0 (root)'.format(os.geteuid())) @@ -326,6 +349,7 @@ def raise_privileges(): def read_notify_conf(path, shell_config): + """Read notify.conf.""" try: shell_config.CONF_DIR = path conf_dict = shell_config.read_config('notify.conf') @@ -336,11 +360,10 @@ def read_notify_conf(path, shell_config): def main(): - """ - Main function of aa-notify that parses command line - arguments and starts the requested operations. - """ + """Run aa-notify. + Parse command line arguments and starts the requested operations. + """ global _, debug_logger, config, args global debug_docs_url, nobody_user, original_effective_user, timeformat @@ -452,7 +475,9 @@ def main(): 'message_footer' ] found_config_keys = config[''].keys() - unknown_keys = [item for item in found_config_keys if item not in allowed_config_keys] + unknown_keys = [ + item for item in found_config_keys if item not in allowed_config_keys + ] for item in unknown_keys: print(_('Warning! Configuration item "{}" is unknown!').format(item))