diff --git a/utils/aa-notify b/utils/aa-notify index ac7b3c0df..11ed99850 100755 --- a/utils/aa-notify +++ b/utils/aa-notify @@ -53,7 +53,7 @@ import apparmor.update_profile as update_profile import LibAppArmor # C-library to parse one log line from apparmor.common import DebugLogger, open_file_read from apparmor.fail import enable_aa_exception_handler -from apparmor.notify import get_last_login_timestamp +from apparmor.notify import get_last_login_timestamp, get_event_special_type, set_userns_special_profile from apparmor.translations import init_translation from apparmor.logparser import ReadLog from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme, ProfileRules @@ -66,6 +66,9 @@ import threading gi.require_version('GLib', '2.0') +# setup module translations +_ = init_translation() + def get_user_login(): """Portable function to get username. @@ -449,22 +452,15 @@ def compile_filter_regex(filters): def can_allow_rule(ev, special_profiles): - if customized_message['userns']['cond'](ev, special_profiles): + ev_type = get_event_special_type(ev, special_profiles) + if ev_type != 'normal': + if ev['execpath'] is None: + return False return not aa.get_profile_filename_from_profile_name(ev['comm']) else: return aa.get_profile_filename_from_profile_name(ev['profile']) is not None -def is_special_profile_userns(ev, special_profiles): - if not special_profiles or ev['profile'] not in special_profiles: - return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns - - if 'execpath' not in ev or not ev['execpath']: - ev['execpath'] = aa.find_executable(ev['comm']) - - return True - - def create_userns_profile(name, path, ans): update_profile_path = update_profile.__file__ @@ -490,6 +486,8 @@ def create_userns_profile(name, path, ans): except subprocess.CalledProcessError as e: if e.returncode != 126: # return code 126 means the user cancelled the request UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode) + else: + aa.update_profiles() def ask_for_user_ns_denied(path, name, interactive=True): @@ -508,8 +506,6 @@ def can_leverage_userns_event(ev): if ev['execpath'] is None: return 'error_cannot_find_path' - aa.update_profiles() - if aa.get_profile_filename_from_profile_name(ev['comm']): return 'error_userns_profile_exists' return 'ok' @@ -544,25 +540,35 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='') if value: out += '\t{} = {}\n'.format(_(key), value) - out += _('\nThe software that declined this operation is {}\n').format(ev['profile']) + if ev['aamode'] == 'REJECTING': + out += _('\nThe profile that denied this operation is {}\n').format(ev['profile']) + else: + out += _('\nThe profile that triggered this alert is {}\n').format(ev['profile']) rule = rl.create_rule_from_ev(ev) if rule: if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC: rule.exec_perms = 'Pix' - aa.update_profiles() - if customized_message['userns']['cond'](ev, special_profiles): - out += _('You may allow it through a dedicated unconfined profile for {}.').format(ev['comm']) + if get_event_special_type(ev, special_profiles) != 'normal': userns_event_usable = can_leverage_userns_event(ev) if userns_event_usable == 'error_cannot_find_path': - raw_rule = _('# You may allow it through a dedicated unconfined profile for {0}. However, apparmor cannot find {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm']) + raw_rule = _('# You may allow it through a dedicated unconfined profile for {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm']) elif userns_event_usable == 'error_userns_profile_exists': raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath']) elif userns_event_usable == 'ok': raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath']) + out += raw_rule[1:] else: raw_rule = rule.get_clean() + # TODO: This is brittle. Priority>1 might be needed. Also do we need to make the message show that we force allow? + if ev['profile'] in aa.active_profiles.profiles and aa.is_known_rule(aa.active_profiles.profiles[ev['profile']], rule.rule_name, rule): + rule.priority = 1 + raw_rule = "priority=1 " + raw_rule + if aa.is_known_rule(aa.active_profiles.profiles[ev['profile']], rule.rule_name, rule): + # TODO: Handle this edge case more gracefully + raw_rule = _('# aa-notify tried to add rule {}. However aa-notify is not allowed to override priority>0 rules. Please fix your profile manually.\n').format(raw_rule) + if profile_path: out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path) out += raw_rule @@ -570,7 +576,6 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='') out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir) else: # Should not happen out += _('ERROR: Could not create rule from event.') - return out, raw_rule @@ -589,7 +594,6 @@ def cb_more_info(notification, action, _args): if ans == 'add_rule': add_to_profile(raw_rule, ev['profile']) elif ans in {'allow', 'deny'}: - customized_message['userns']['cond'](ev, special_profiles) create_userns_profile(ev['comm'], ev['execpath'], ans) @@ -614,6 +618,8 @@ def add_to_profile(rule, profile_name): except subprocess.CalledProcessError as e: if e.returncode != 126: # return code 126 means the user cancelled the request ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show() + else: + aa.update_profiles() def create_from_file(file_path): @@ -624,6 +630,8 @@ def create_from_file(file_path): except subprocess.CalledProcessError as e: if e.returncode != 126: # return code 126 means the user cancelled the request ErrorGUI(_('Failed to add some rules'), False).show() + else: + aa.update_profiles() def allow_rules(clean_rules, allow_all=False): @@ -669,26 +677,29 @@ def cb_add_to_profile(notification, action, _args): ErrorGUI(_('ERROR: Could not create rule from event.'), False).show() return - aa.update_profiles() - - if customized_message['userns']['cond'](ev, special_profiles): + if get_event_special_type(ev, special_profiles) != 'normal': ask_for_user_ns_denied(ev['execpath'], ev['comm'], False) else: add_to_profile(rule.get_clean(), ev['profile']) customized_message = { - 'userns': { - 'cond': lambda ev, special_profiles: (ev['operation'] == 'userns_create' or ev['operation'] == 'capable') and is_special_profile_userns(ev, special_profiles), - 'msg': 'Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?' + 'userns_change_profile': { + 'msg': _('Application {0} is transited to special profile. Capabilities could be denied') + }, + 'userns_denied': { + 'msg': _('Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?') + }, + 'userns_capable': { + 'msg': _('Application {0} in special profile wanted to add a capability: ok?') } } def customize_notification_message(ev, msg, special_profiles): - if customized_message['userns']['cond'](ev, special_profiles): - msg = _(customized_message['userns']['msg']).format(ev['comm']) - + msg_type = get_event_special_type(ev, special_profiles) + if msg_type in customized_message: + msg = customized_message[msg_type]['msg'].format(ev['comm']) return msg @@ -712,7 +723,6 @@ def aggregate_event(agg, ev, keys_to_aggregate): def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles): notification = '' - summary = '' more_info = '' clean_rules = dict() summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys()))) @@ -740,10 +750,11 @@ def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles ev = data['events'][0] profile_name = ev['profile'] profile_path = aa.get_profile_filename_from_profile_name(profile_name) - is_userns_profile = customized_message['userns']['cond'](ev, special_profiles) - + is_userns_profile = get_event_special_type(ev, special_profiles) != 'normal' if is_userns_profile: bin_name = ev['comm'] + if 'execpath' not in ev: + ev['execpath'] = None bin_path = ev['execpath'] actionable = can_leverage_userns_event(ev) == 'ok' else: @@ -761,7 +772,7 @@ def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles rules_for_profiles.add(raw_rule) if rules_for_profiles != set(): - if profile not in special_profiles: + if not is_userns_profile: if profile_path is not None: clean_rules_name = _('profile {}:').format(profile) elif re_snap.match(profile): @@ -819,9 +830,6 @@ def main(): # setup exception handling enable_aa_exception_handler() - # setup module translations - _ = init_translation() - # Register the on_exit method with atexit # Takes care of closing the debug log etc atexit.register(aa.on_exit) @@ -1027,7 +1035,9 @@ def main(): userns_special_profiles = config['']['userns_special_profiles'].strip().split(',') else: # By default, unconfined and unprivileged_userns are the special profiles - userns_special_profiles = ['unconfined', 'unprivileged_userns'] + userns_special_profiles = ['unconfined', 'unprivileged_userns', 'unpriv_.*'] + # To support regexes + userns_special_profiles = set_userns_special_profile(userns_special_profiles) if 'ignore_denied_capability' in config['']: ignore_denied_capability = config['']['ignore_denied_capability'].strip().split(',') @@ -1156,10 +1166,14 @@ def main(): if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability: continue - # Special behaivor for userns: - if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles): - prompt_userns(ev) - continue # Notification already displayed for this event, we go to the next one. + # Special behavior for userns: + if get_event_special_type(ev, userns_special_profiles) != 'normal': + if 'execpath' not in ev: + ev['execpath'] = None + + if args.prompt_filter and 'userns' in args.prompt_filter: + prompt_userns(ev) + continue # Notification already displayed for this event, we go to the next one. # Notifications should not be run as root, since root probably is # the wrong desktop user and not the one getting the notifications. diff --git a/utils/aa-notify.pod b/utils/aa-notify.pod index fea8a25ee..df466f84c 100644 --- a/utils/aa-notify.pod +++ b/utils/aa-notify.pod @@ -89,8 +89,8 @@ System-wide configuration for B is done via # Set to 'no' to disable AppArmor notifications globally show_notifications="yes" - # Special profiles used to remove privileges for unconfined binaries using user namespaces. If unsure, leave as is. - userns_special_profiles="unconfined,unprivileged_userns" + # Special profiles used to remove privileges for unconfined binaries using user namespaces. Special profiles use Python's regular expression syntax. If unsure, leave as is. + userns_special_profiles="unconfined,unprivileged_userns,unpriv_.*" # Theme for aa-notify GUI. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes. interface_theme="ubuntu" diff --git a/utils/apparmor/gui.py b/utils/apparmor/gui.py index 3fbfc006a..1df1501af 100644 --- a/utils/apparmor/gui.py +++ b/utils/apparmor/gui.py @@ -1,6 +1,7 @@ import os import tkinter as tk import tkinter.ttk as ttk +import tkinter.font import subprocess import apparmor.aa as aa @@ -205,7 +206,7 @@ class ShowMoreGUIAggregated(GUI): def create_profile_rules_frame(self, parent, clean_rules): for profile_name, profile_rules in clean_rules.items(): - label = ttk.Label(parent, text=profile_name, font=tk.font.BOLD) + label = ttk.Label(parent, text=profile_name, font=tkinter.font.BOLD) label.pack(anchor='w', pady=(5, 0)) label.bind("", lambda event, rules=profile_rules: self.toggle_profile_rules(rules)) diff --git a/utils/apparmor/logparser.py b/utils/apparmor/logparser.py index adc061860..e3cfc2eea 100644 --- a/utils/apparmor/logparser.py +++ b/utils/apparmor/logparser.py @@ -113,7 +113,6 @@ class ReadLog: return log_entry def get_event_type(self, e): - if e['operation'] == 'exec': return 'file' elif e['class'] and e['class'] == 'namespace': @@ -131,6 +130,8 @@ class ReadLog: return 'pivot_root' elif e['class'] and e['class'] == 'net' and e['family'] and e['family'] == 'unix': return 'unix' + elif e['operation'] == 'change_onexec': + return 'change_profile' elif e['class'] == 'file' or self.op_type(e) == 'file': return 'file' elif e['operation'] == 'capable': @@ -160,6 +161,8 @@ class ReadLog: return None def create_rule_from_ev(self, ev): + if not ev: + return None event_type = self.get_event_type(ev) if not event_type: return None @@ -244,7 +247,7 @@ class ReadLog: elif event_type == 'io_uring': ev['peer_profile'] = event.peer_profile - elif event_type == 'capability': + elif event_type == 'capability' or ev['operation'] == 'change_onexec': ev['comm'] = event.comm if not ev['time']: diff --git a/utils/apparmor/notify.py b/utils/apparmor/notify.py index f4d883ced..a2f7a34fa 100644 --- a/utils/apparmor/notify.py +++ b/utils/apparmor/notify.py @@ -16,6 +16,7 @@ import os import struct import sqlite3 +import re from apparmor.common import AppArmorBug, DebugLogger @@ -129,3 +130,33 @@ def get_last_login_timestamp_wtmp(username, filename='/var/log/wtmp'): # When loop is done, last value should be the latest login timestamp return last_login + + +def is_special_profile_userns(ev, special_profiles): + if 'comm' not in ev: + return False # special profiles have a 'comm' entry + + if not special_profiles or not special_profiles.match(ev['profile']): + return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns + + return True + + +def get_event_special_type(ev, special_profiles): + if is_special_profile_userns(ev, special_profiles): + if ev['operation'] == 'userns_create': + if ev['aamode'] == 'REJECTING': + return 'userns_denied' + else: + return 'userns_change_profile' + elif ev['operation'] == 'change_onexec': + return 'userns_change_profile' + elif ev['operation'] == 'capable': + return 'userns_capable' + else: + raise AppArmorBug('unexpected operation: %s' % ev['operation']) + return 'normal' + + +def set_userns_special_profile(special_profiles): + return re.compile('^({})$'.format('|'.join(special_profiles))) diff --git a/utils/notify.conf b/utils/notify.conf index a6145c643..44b048e6c 100644 --- a/utils/notify.conf +++ b/utils/notify.conf @@ -11,8 +11,8 @@ # Set to 'no' to disable AppArmor notifications globally show_notifications="yes" -# Special profiles used to remove privileges for unconfined binaries using user namespaces. If unsure, leave as is. -userns_special_profiles="unconfined,unprivileged_userns" +# Special profiles used to remove privileges for unconfined binaries using user namespaces. Special profiles use Python's regular expression syntax. If unsure, leave as is. +userns_special_profiles="unconfined,unprivileged_userns,unpriv_.*" # Theme to use for aa-notify GUI themes. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes. interface_theme="ubuntu" diff --git a/utils/test/test-notify.py b/utils/test/test-notify.py index 4afd35439..69a94a716 100644 --- a/utils/test/test-notify.py +++ b/utils/test/test-notify.py @@ -12,7 +12,8 @@ import unittest from apparmor.common import AppArmorBug -from apparmor.notify import get_last_login_timestamp, get_last_login_timestamp_wtmp, sane_timestamp +from apparmor.notify import get_last_login_timestamp, get_last_login_timestamp_wtmp, sane_timestamp, get_event_special_type, set_userns_special_profile +from apparmor.logparser import ReadLog from common_test import AATest, setup_all_loops @@ -87,6 +88,36 @@ class TestGet_last_login_timestamp_wtmp(AATest): get_last_login_timestamp_wtmp('root', 'wtmp-examples/wtmp-x86_64-past') +class TestEventSpecialType(AATest): + userns_special_profiles = set_userns_special_profile(['unconfined', 'unprivileged_userns', 'unpriv_.*']) + parser = ReadLog('', '', '') + tests = ( + ('[ 176.385388] audit: type=1400 audit(1666891380.570:78): apparmor="DENIED" operation="userns_create" class="namespace" profile="/usr/bin/bwrap-userns-restrict" pid=1785 comm="userns_child_ex" requested="userns_create" denied="userns_create"', 'normal'), + ('[ 839.488169] audit: type=1400 audit(1752065668.819:208): apparmor="DENIED" operation="userns_create" class="namespace" info="Userns create restricted - failed to find unprivileged_userns profile" error=-13 profile="unconfined" pid=12124 comm="unshare" requested="userns_create" denied="userns_create" target="unprivileged_userns"', 'userns_denied'), + ('[ 429.272003] audit: type=1400 audit(1720613712.153:168): apparmor="AUDIT" operation="userns_create" class="namespace" info="Userns create - transitioning profile" profile="unconfined" pid=5630 comm="unshare" requested="userns_create" target="unprivileged_userns" execpath="/usr/bin/unshare"', 'userns_change_profile'), + ('[ 52.901383] audit: type=1400 audit(1752064882.228:82): apparmor="DENIED" operation="capable" class="cap" profile="unprivileged_userns" pid=6700 comm="electron" capability=21 capname="sys_admin"', 'userns_capable'), + ('Jul 31 17:11:16 dbusdev-saucy-amd64 dbus[1692]: apparmor="DENIED" operation="dbus_bind" bus="session" name="com.apparmor.Test" mask="bind" pid=2940 profile="/tmp/apparmor-2.8.0/tests/regression/apparmor/dbus_service"', 'normal'), + ('[103975.623545] audit: type=1400 audit(1481284511.494:2807): apparmor="DENIED" operation="change_onexec" info="no new privs" error=-1 namespace="root//lxd-tor_" profile="unconfined" name="system_tor" pid=18593 comm="(tor)" target="system_tor"', 'userns_change_profile'), + ('[78661.551820] audit: type=1400 audit(1752661047.170:350): apparmor="DENIED" operation="capable" class="cap" profile="unpriv_bwrap" pid=1412550 comm="node" capability=21 capname="sys_admin"', 'userns_capable'), + ) + + def _run_test(self, ev, expected): + parsed_event = self.parser.parse_event(ev) + r = self.parser.create_rule_from_ev(parsed_event) + self.assertIsNotNone(r) + + real_type = get_event_special_type(parsed_event, self.userns_special_profiles) + self.assertEqual(expected, real_type, + "ev {}: {} != {}".format(ev, expected, real_type)) + + def test_invalid(self): + ev = 'type=AVC msg=audit(1333698107.128:273917): apparmor="DENIED" operation="recvmsg" parent=1596 profile="unprivileged_userns" pid=1875 comm="nc" laddr=::ffff:127.0.0.1 lport=2048 faddr=::ffff:127.0.0.1 fport=59180 family="inet6" sock_type="stream" protocol=6' + parsed_event = self.parser.parse_event(ev) + parsed_event['comm'] = 'something' # Artificially crafted invalid event + with self.assertRaises(AppArmorBug): + get_event_special_type(parsed_event, self.userns_special_profiles) + + setup_all_loops(__name__) if __name__ == '__main__': unittest.main(verbosity=1)