mirror of
https://gitlab.com/apparmor/apparmor
synced 2025-08-22 10:07:12 +00:00
Merge aa-notify: reduce the likelihood of misuses
This MR removes some footguns in aa-notify - Prevents the modification of special profiles - Improve the clarity of messages - Add support for regexes in userns_special_profiles - Refactor get_event_type. - Add support for regexes for special profiles - Optimize aa-notify performances - Minor bugfixes Signed-off-by: Maxime Bélair <maxime.belair@canonical.com> MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/1732 Approved-by: Christian Boltz <apparmor@cboltz.de> Merged-by: Maxime Bélair <maxime.belair@canonical.com>
This commit is contained in:
commit
e82ee9f4f4
@ -53,7 +53,7 @@ import apparmor.update_profile as update_profile
|
|||||||
import LibAppArmor # C-library to parse one log line
|
import LibAppArmor # C-library to parse one log line
|
||||||
from apparmor.common import DebugLogger, open_file_read
|
from apparmor.common import DebugLogger, open_file_read
|
||||||
from apparmor.fail import enable_aa_exception_handler
|
from apparmor.fail import enable_aa_exception_handler
|
||||||
from apparmor.notify import get_last_login_timestamp
|
from apparmor.notify import get_last_login_timestamp, get_event_special_type, set_userns_special_profile
|
||||||
from apparmor.translations import init_translation
|
from apparmor.translations import init_translation
|
||||||
from apparmor.logparser import ReadLog
|
from apparmor.logparser import ReadLog
|
||||||
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme, ProfileRules
|
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme, ProfileRules
|
||||||
@ -66,6 +66,9 @@ import threading
|
|||||||
|
|
||||||
gi.require_version('GLib', '2.0')
|
gi.require_version('GLib', '2.0')
|
||||||
|
|
||||||
|
# setup module translations
|
||||||
|
_ = init_translation()
|
||||||
|
|
||||||
|
|
||||||
def get_user_login():
|
def get_user_login():
|
||||||
"""Portable function to get username.
|
"""Portable function to get username.
|
||||||
@ -449,22 +452,15 @@ def compile_filter_regex(filters):
|
|||||||
|
|
||||||
|
|
||||||
def can_allow_rule(ev, special_profiles):
|
def can_allow_rule(ev, special_profiles):
|
||||||
if customized_message['userns']['cond'](ev, special_profiles):
|
ev_type = get_event_special_type(ev, special_profiles)
|
||||||
|
if ev_type != 'normal':
|
||||||
|
if ev['execpath'] is None:
|
||||||
|
return False
|
||||||
return not aa.get_profile_filename_from_profile_name(ev['comm'])
|
return not aa.get_profile_filename_from_profile_name(ev['comm'])
|
||||||
else:
|
else:
|
||||||
return aa.get_profile_filename_from_profile_name(ev['profile']) is not None
|
return aa.get_profile_filename_from_profile_name(ev['profile']) is not None
|
||||||
|
|
||||||
|
|
||||||
def is_special_profile_userns(ev, special_profiles):
|
|
||||||
if not special_profiles or ev['profile'] not in special_profiles:
|
|
||||||
return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns
|
|
||||||
|
|
||||||
if 'execpath' not in ev or not ev['execpath']:
|
|
||||||
ev['execpath'] = aa.find_executable(ev['comm'])
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def create_userns_profile(name, path, ans):
|
def create_userns_profile(name, path, ans):
|
||||||
update_profile_path = update_profile.__file__
|
update_profile_path = update_profile.__file__
|
||||||
|
|
||||||
@ -490,6 +486,8 @@ def create_userns_profile(name, path, ans):
|
|||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
if e.returncode != 126: # return code 126 means the user cancelled the request
|
if e.returncode != 126: # return code 126 means the user cancelled the request
|
||||||
UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode)
|
UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode)
|
||||||
|
else:
|
||||||
|
aa.update_profiles()
|
||||||
|
|
||||||
|
|
||||||
def ask_for_user_ns_denied(path, name, interactive=True):
|
def ask_for_user_ns_denied(path, name, interactive=True):
|
||||||
@ -508,8 +506,6 @@ def can_leverage_userns_event(ev):
|
|||||||
if ev['execpath'] is None:
|
if ev['execpath'] is None:
|
||||||
return 'error_cannot_find_path'
|
return 'error_cannot_find_path'
|
||||||
|
|
||||||
aa.update_profiles()
|
|
||||||
|
|
||||||
if aa.get_profile_filename_from_profile_name(ev['comm']):
|
if aa.get_profile_filename_from_profile_name(ev['comm']):
|
||||||
return 'error_userns_profile_exists'
|
return 'error_userns_profile_exists'
|
||||||
return 'ok'
|
return 'ok'
|
||||||
@ -544,25 +540,35 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='')
|
|||||||
if value:
|
if value:
|
||||||
out += '\t{} = {}\n'.format(_(key), value)
|
out += '\t{} = {}\n'.format(_(key), value)
|
||||||
|
|
||||||
out += _('\nThe software that declined this operation is {}\n').format(ev['profile'])
|
if ev['aamode'] == 'REJECTING':
|
||||||
|
out += _('\nThe profile that denied this operation is {}\n').format(ev['profile'])
|
||||||
|
else:
|
||||||
|
out += _('\nThe profile that triggered this alert is {}\n').format(ev['profile'])
|
||||||
|
|
||||||
rule = rl.create_rule_from_ev(ev)
|
rule = rl.create_rule_from_ev(ev)
|
||||||
|
|
||||||
if rule:
|
if rule:
|
||||||
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
|
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
|
||||||
rule.exec_perms = 'Pix'
|
rule.exec_perms = 'Pix'
|
||||||
aa.update_profiles()
|
if get_event_special_type(ev, special_profiles) != 'normal':
|
||||||
if customized_message['userns']['cond'](ev, special_profiles):
|
|
||||||
out += _('You may allow it through a dedicated unconfined profile for {}.').format(ev['comm'])
|
|
||||||
userns_event_usable = can_leverage_userns_event(ev)
|
userns_event_usable = can_leverage_userns_event(ev)
|
||||||
if userns_event_usable == 'error_cannot_find_path':
|
if userns_event_usable == 'error_cannot_find_path':
|
||||||
raw_rule = _('# You may allow it through a dedicated unconfined profile for {0}. However, apparmor cannot find {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm'])
|
raw_rule = _('# You may allow it through a dedicated unconfined profile for {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm'])
|
||||||
elif userns_event_usable == 'error_userns_profile_exists':
|
elif userns_event_usable == 'error_userns_profile_exists':
|
||||||
raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath'])
|
raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath'])
|
||||||
elif userns_event_usable == 'ok':
|
elif userns_event_usable == 'ok':
|
||||||
raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath'])
|
raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath'])
|
||||||
|
out += raw_rule[1:]
|
||||||
else:
|
else:
|
||||||
raw_rule = rule.get_clean()
|
raw_rule = rule.get_clean()
|
||||||
|
# TODO: This is brittle. Priority>1 might be needed. Also do we need to make the message show that we force allow?
|
||||||
|
if ev['profile'] in aa.active_profiles.profiles and aa.is_known_rule(aa.active_profiles.profiles[ev['profile']], rule.rule_name, rule):
|
||||||
|
rule.priority = 1
|
||||||
|
raw_rule = "priority=1 " + raw_rule
|
||||||
|
if aa.is_known_rule(aa.active_profiles.profiles[ev['profile']], rule.rule_name, rule):
|
||||||
|
# TODO: Handle this edge case more gracefully
|
||||||
|
raw_rule = _('# aa-notify tried to add rule {}. However aa-notify is not allowed to override priority>0 rules. Please fix your profile manually.\n').format(raw_rule)
|
||||||
|
|
||||||
if profile_path:
|
if profile_path:
|
||||||
out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path)
|
out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path)
|
||||||
out += raw_rule
|
out += raw_rule
|
||||||
@ -570,7 +576,6 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='')
|
|||||||
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir)
|
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir)
|
||||||
else: # Should not happen
|
else: # Should not happen
|
||||||
out += _('ERROR: Could not create rule from event.')
|
out += _('ERROR: Could not create rule from event.')
|
||||||
|
|
||||||
return out, raw_rule
|
return out, raw_rule
|
||||||
|
|
||||||
|
|
||||||
@ -589,7 +594,6 @@ def cb_more_info(notification, action, _args):
|
|||||||
if ans == 'add_rule':
|
if ans == 'add_rule':
|
||||||
add_to_profile(raw_rule, ev['profile'])
|
add_to_profile(raw_rule, ev['profile'])
|
||||||
elif ans in {'allow', 'deny'}:
|
elif ans in {'allow', 'deny'}:
|
||||||
customized_message['userns']['cond'](ev, special_profiles)
|
|
||||||
create_userns_profile(ev['comm'], ev['execpath'], ans)
|
create_userns_profile(ev['comm'], ev['execpath'], ans)
|
||||||
|
|
||||||
|
|
||||||
@ -614,6 +618,8 @@ def add_to_profile(rule, profile_name):
|
|||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
if e.returncode != 126: # return code 126 means the user cancelled the request
|
if e.returncode != 126: # return code 126 means the user cancelled the request
|
||||||
ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show()
|
ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show()
|
||||||
|
else:
|
||||||
|
aa.update_profiles()
|
||||||
|
|
||||||
|
|
||||||
def create_from_file(file_path):
|
def create_from_file(file_path):
|
||||||
@ -624,6 +630,8 @@ def create_from_file(file_path):
|
|||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
if e.returncode != 126: # return code 126 means the user cancelled the request
|
if e.returncode != 126: # return code 126 means the user cancelled the request
|
||||||
ErrorGUI(_('Failed to add some rules'), False).show()
|
ErrorGUI(_('Failed to add some rules'), False).show()
|
||||||
|
else:
|
||||||
|
aa.update_profiles()
|
||||||
|
|
||||||
|
|
||||||
def allow_rules(clean_rules, allow_all=False):
|
def allow_rules(clean_rules, allow_all=False):
|
||||||
@ -669,26 +677,29 @@ def cb_add_to_profile(notification, action, _args):
|
|||||||
ErrorGUI(_('ERROR: Could not create rule from event.'), False).show()
|
ErrorGUI(_('ERROR: Could not create rule from event.'), False).show()
|
||||||
return
|
return
|
||||||
|
|
||||||
aa.update_profiles()
|
if get_event_special_type(ev, special_profiles) != 'normal':
|
||||||
|
|
||||||
if customized_message['userns']['cond'](ev, special_profiles):
|
|
||||||
ask_for_user_ns_denied(ev['execpath'], ev['comm'], False)
|
ask_for_user_ns_denied(ev['execpath'], ev['comm'], False)
|
||||||
else:
|
else:
|
||||||
add_to_profile(rule.get_clean(), ev['profile'])
|
add_to_profile(rule.get_clean(), ev['profile'])
|
||||||
|
|
||||||
|
|
||||||
customized_message = {
|
customized_message = {
|
||||||
'userns': {
|
'userns_change_profile': {
|
||||||
'cond': lambda ev, special_profiles: (ev['operation'] == 'userns_create' or ev['operation'] == 'capable') and is_special_profile_userns(ev, special_profiles),
|
'msg': _('Application {0} is transited to special profile. Capabilities could be denied')
|
||||||
'msg': 'Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?'
|
},
|
||||||
|
'userns_denied': {
|
||||||
|
'msg': _('Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?')
|
||||||
|
},
|
||||||
|
'userns_capable': {
|
||||||
|
'msg': _('Application {0} in special profile wanted to add a capability: ok?')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def customize_notification_message(ev, msg, special_profiles):
|
def customize_notification_message(ev, msg, special_profiles):
|
||||||
if customized_message['userns']['cond'](ev, special_profiles):
|
msg_type = get_event_special_type(ev, special_profiles)
|
||||||
msg = _(customized_message['userns']['msg']).format(ev['comm'])
|
if msg_type in customized_message:
|
||||||
|
msg = customized_message[msg_type]['msg'].format(ev['comm'])
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
@ -712,7 +723,6 @@ def aggregate_event(agg, ev, keys_to_aggregate):
|
|||||||
|
|
||||||
def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles):
|
def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles):
|
||||||
notification = ''
|
notification = ''
|
||||||
summary = ''
|
|
||||||
more_info = ''
|
more_info = ''
|
||||||
clean_rules = dict()
|
clean_rules = dict()
|
||||||
summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys())))
|
summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys())))
|
||||||
@ -740,10 +750,11 @@ def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles
|
|||||||
ev = data['events'][0]
|
ev = data['events'][0]
|
||||||
profile_name = ev['profile']
|
profile_name = ev['profile']
|
||||||
profile_path = aa.get_profile_filename_from_profile_name(profile_name)
|
profile_path = aa.get_profile_filename_from_profile_name(profile_name)
|
||||||
is_userns_profile = customized_message['userns']['cond'](ev, special_profiles)
|
is_userns_profile = get_event_special_type(ev, special_profiles) != 'normal'
|
||||||
|
|
||||||
if is_userns_profile:
|
if is_userns_profile:
|
||||||
bin_name = ev['comm']
|
bin_name = ev['comm']
|
||||||
|
if 'execpath' not in ev:
|
||||||
|
ev['execpath'] = None
|
||||||
bin_path = ev['execpath']
|
bin_path = ev['execpath']
|
||||||
actionable = can_leverage_userns_event(ev) == 'ok'
|
actionable = can_leverage_userns_event(ev) == 'ok'
|
||||||
else:
|
else:
|
||||||
@ -761,7 +772,7 @@ def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles
|
|||||||
rules_for_profiles.add(raw_rule)
|
rules_for_profiles.add(raw_rule)
|
||||||
|
|
||||||
if rules_for_profiles != set():
|
if rules_for_profiles != set():
|
||||||
if profile not in special_profiles:
|
if not is_userns_profile:
|
||||||
if profile_path is not None:
|
if profile_path is not None:
|
||||||
clean_rules_name = _('profile {}:').format(profile)
|
clean_rules_name = _('profile {}:').format(profile)
|
||||||
elif re_snap.match(profile):
|
elif re_snap.match(profile):
|
||||||
@ -819,9 +830,6 @@ def main():
|
|||||||
# setup exception handling
|
# setup exception handling
|
||||||
enable_aa_exception_handler()
|
enable_aa_exception_handler()
|
||||||
|
|
||||||
# setup module translations
|
|
||||||
_ = init_translation()
|
|
||||||
|
|
||||||
# Register the on_exit method with atexit
|
# Register the on_exit method with atexit
|
||||||
# Takes care of closing the debug log etc
|
# Takes care of closing the debug log etc
|
||||||
atexit.register(aa.on_exit)
|
atexit.register(aa.on_exit)
|
||||||
@ -1027,7 +1035,9 @@ def main():
|
|||||||
userns_special_profiles = config['']['userns_special_profiles'].strip().split(',')
|
userns_special_profiles = config['']['userns_special_profiles'].strip().split(',')
|
||||||
else:
|
else:
|
||||||
# By default, unconfined and unprivileged_userns are the special profiles
|
# By default, unconfined and unprivileged_userns are the special profiles
|
||||||
userns_special_profiles = ['unconfined', 'unprivileged_userns']
|
userns_special_profiles = ['unconfined', 'unprivileged_userns', 'unpriv_.*']
|
||||||
|
# To support regexes
|
||||||
|
userns_special_profiles = set_userns_special_profile(userns_special_profiles)
|
||||||
|
|
||||||
if 'ignore_denied_capability' in config['']:
|
if 'ignore_denied_capability' in config['']:
|
||||||
ignore_denied_capability = config['']['ignore_denied_capability'].strip().split(',')
|
ignore_denied_capability = config['']['ignore_denied_capability'].strip().split(',')
|
||||||
@ -1156,10 +1166,14 @@ def main():
|
|||||||
if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability:
|
if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Special behaivor for userns:
|
# Special behavior for userns:
|
||||||
if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles):
|
if get_event_special_type(ev, userns_special_profiles) != 'normal':
|
||||||
prompt_userns(ev)
|
if 'execpath' not in ev:
|
||||||
continue # Notification already displayed for this event, we go to the next one.
|
ev['execpath'] = None
|
||||||
|
|
||||||
|
if args.prompt_filter and 'userns' in args.prompt_filter:
|
||||||
|
prompt_userns(ev)
|
||||||
|
continue # Notification already displayed for this event, we go to the next one.
|
||||||
|
|
||||||
# Notifications should not be run as root, since root probably is
|
# Notifications should not be run as root, since root probably is
|
||||||
# the wrong desktop user and not the one getting the notifications.
|
# the wrong desktop user and not the one getting the notifications.
|
||||||
|
@ -89,8 +89,8 @@ System-wide configuration for B<aa-notify> is done via
|
|||||||
# Set to 'no' to disable AppArmor notifications globally
|
# Set to 'no' to disable AppArmor notifications globally
|
||||||
show_notifications="yes"
|
show_notifications="yes"
|
||||||
|
|
||||||
# Special profiles used to remove privileges for unconfined binaries using user namespaces. If unsure, leave as is.
|
# Special profiles used to remove privileges for unconfined binaries using user namespaces. Special profiles use Python's regular expression syntax. If unsure, leave as is.
|
||||||
userns_special_profiles="unconfined,unprivileged_userns"
|
userns_special_profiles="unconfined,unprivileged_userns,unpriv_.*"
|
||||||
|
|
||||||
# Theme for aa-notify GUI. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes.
|
# Theme for aa-notify GUI. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes.
|
||||||
interface_theme="ubuntu"
|
interface_theme="ubuntu"
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import tkinter as tk
|
import tkinter as tk
|
||||||
import tkinter.ttk as ttk
|
import tkinter.ttk as ttk
|
||||||
|
import tkinter.font
|
||||||
import subprocess
|
import subprocess
|
||||||
import apparmor.aa as aa
|
import apparmor.aa as aa
|
||||||
|
|
||||||
@ -205,7 +206,7 @@ class ShowMoreGUIAggregated(GUI):
|
|||||||
|
|
||||||
def create_profile_rules_frame(self, parent, clean_rules):
|
def create_profile_rules_frame(self, parent, clean_rules):
|
||||||
for profile_name, profile_rules in clean_rules.items():
|
for profile_name, profile_rules in clean_rules.items():
|
||||||
label = ttk.Label(parent, text=profile_name, font=tk.font.BOLD)
|
label = ttk.Label(parent, text=profile_name, font=tkinter.font.BOLD)
|
||||||
label.pack(anchor='w', pady=(5, 0))
|
label.pack(anchor='w', pady=(5, 0))
|
||||||
label.bind("<Button-1>", lambda event, rules=profile_rules: self.toggle_profile_rules(rules))
|
label.bind("<Button-1>", lambda event, rules=profile_rules: self.toggle_profile_rules(rules))
|
||||||
|
|
||||||
|
@ -113,7 +113,6 @@ class ReadLog:
|
|||||||
return log_entry
|
return log_entry
|
||||||
|
|
||||||
def get_event_type(self, e):
|
def get_event_type(self, e):
|
||||||
|
|
||||||
if e['operation'] == 'exec':
|
if e['operation'] == 'exec':
|
||||||
return 'file'
|
return 'file'
|
||||||
elif e['class'] and e['class'] == 'namespace':
|
elif e['class'] and e['class'] == 'namespace':
|
||||||
@ -131,6 +130,8 @@ class ReadLog:
|
|||||||
return 'pivot_root'
|
return 'pivot_root'
|
||||||
elif e['class'] and e['class'] == 'net' and e['family'] and e['family'] == 'unix':
|
elif e['class'] and e['class'] == 'net' and e['family'] and e['family'] == 'unix':
|
||||||
return 'unix'
|
return 'unix'
|
||||||
|
elif e['operation'] == 'change_onexec':
|
||||||
|
return 'change_profile'
|
||||||
elif e['class'] == 'file' or self.op_type(e) == 'file':
|
elif e['class'] == 'file' or self.op_type(e) == 'file':
|
||||||
return 'file'
|
return 'file'
|
||||||
elif e['operation'] == 'capable':
|
elif e['operation'] == 'capable':
|
||||||
@ -160,6 +161,8 @@ class ReadLog:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def create_rule_from_ev(self, ev):
|
def create_rule_from_ev(self, ev):
|
||||||
|
if not ev:
|
||||||
|
return None
|
||||||
event_type = self.get_event_type(ev)
|
event_type = self.get_event_type(ev)
|
||||||
if not event_type:
|
if not event_type:
|
||||||
return None
|
return None
|
||||||
@ -244,7 +247,7 @@ class ReadLog:
|
|||||||
|
|
||||||
elif event_type == 'io_uring':
|
elif event_type == 'io_uring':
|
||||||
ev['peer_profile'] = event.peer_profile
|
ev['peer_profile'] = event.peer_profile
|
||||||
elif event_type == 'capability':
|
elif event_type == 'capability' or ev['operation'] == 'change_onexec':
|
||||||
ev['comm'] = event.comm
|
ev['comm'] = event.comm
|
||||||
|
|
||||||
if not ev['time']:
|
if not ev['time']:
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
import os
|
import os
|
||||||
import struct
|
import struct
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import re
|
||||||
|
|
||||||
from apparmor.common import AppArmorBug, DebugLogger
|
from apparmor.common import AppArmorBug, DebugLogger
|
||||||
|
|
||||||
@ -129,3 +130,33 @@ def get_last_login_timestamp_wtmp(username, filename='/var/log/wtmp'):
|
|||||||
|
|
||||||
# When loop is done, last value should be the latest login timestamp
|
# When loop is done, last value should be the latest login timestamp
|
||||||
return last_login
|
return last_login
|
||||||
|
|
||||||
|
|
||||||
|
def is_special_profile_userns(ev, special_profiles):
|
||||||
|
if 'comm' not in ev:
|
||||||
|
return False # special profiles have a 'comm' entry
|
||||||
|
|
||||||
|
if not special_profiles or not special_profiles.match(ev['profile']):
|
||||||
|
return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def get_event_special_type(ev, special_profiles):
|
||||||
|
if is_special_profile_userns(ev, special_profiles):
|
||||||
|
if ev['operation'] == 'userns_create':
|
||||||
|
if ev['aamode'] == 'REJECTING':
|
||||||
|
return 'userns_denied'
|
||||||
|
else:
|
||||||
|
return 'userns_change_profile'
|
||||||
|
elif ev['operation'] == 'change_onexec':
|
||||||
|
return 'userns_change_profile'
|
||||||
|
elif ev['operation'] == 'capable':
|
||||||
|
return 'userns_capable'
|
||||||
|
else:
|
||||||
|
raise AppArmorBug('unexpected operation: %s' % ev['operation'])
|
||||||
|
return 'normal'
|
||||||
|
|
||||||
|
|
||||||
|
def set_userns_special_profile(special_profiles):
|
||||||
|
return re.compile('^({})$'.format('|'.join(special_profiles)))
|
||||||
|
@ -11,8 +11,8 @@
|
|||||||
# Set to 'no' to disable AppArmor notifications globally
|
# Set to 'no' to disable AppArmor notifications globally
|
||||||
show_notifications="yes"
|
show_notifications="yes"
|
||||||
|
|
||||||
# Special profiles used to remove privileges for unconfined binaries using user namespaces. If unsure, leave as is.
|
# Special profiles used to remove privileges for unconfined binaries using user namespaces. Special profiles use Python's regular expression syntax. If unsure, leave as is.
|
||||||
userns_special_profiles="unconfined,unprivileged_userns"
|
userns_special_profiles="unconfined,unprivileged_userns,unpriv_.*"
|
||||||
|
|
||||||
# Theme to use for aa-notify GUI themes. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes.
|
# Theme to use for aa-notify GUI themes. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes.
|
||||||
interface_theme="ubuntu"
|
interface_theme="ubuntu"
|
||||||
|
@ -12,7 +12,8 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from apparmor.common import AppArmorBug
|
from apparmor.common import AppArmorBug
|
||||||
from apparmor.notify import get_last_login_timestamp, get_last_login_timestamp_wtmp, sane_timestamp
|
from apparmor.notify import get_last_login_timestamp, get_last_login_timestamp_wtmp, sane_timestamp, get_event_special_type, set_userns_special_profile
|
||||||
|
from apparmor.logparser import ReadLog
|
||||||
from common_test import AATest, setup_all_loops
|
from common_test import AATest, setup_all_loops
|
||||||
|
|
||||||
|
|
||||||
@ -87,6 +88,36 @@ class TestGet_last_login_timestamp_wtmp(AATest):
|
|||||||
get_last_login_timestamp_wtmp('root', 'wtmp-examples/wtmp-x86_64-past')
|
get_last_login_timestamp_wtmp('root', 'wtmp-examples/wtmp-x86_64-past')
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventSpecialType(AATest):
|
||||||
|
userns_special_profiles = set_userns_special_profile(['unconfined', 'unprivileged_userns', 'unpriv_.*'])
|
||||||
|
parser = ReadLog('', '', '')
|
||||||
|
tests = (
|
||||||
|
('[ 176.385388] audit: type=1400 audit(1666891380.570:78): apparmor="DENIED" operation="userns_create" class="namespace" profile="/usr/bin/bwrap-userns-restrict" pid=1785 comm="userns_child_ex" requested="userns_create" denied="userns_create"', 'normal'),
|
||||||
|
('[ 839.488169] audit: type=1400 audit(1752065668.819:208): apparmor="DENIED" operation="userns_create" class="namespace" info="Userns create restricted - failed to find unprivileged_userns profile" error=-13 profile="unconfined" pid=12124 comm="unshare" requested="userns_create" denied="userns_create" target="unprivileged_userns"', 'userns_denied'),
|
||||||
|
('[ 429.272003] audit: type=1400 audit(1720613712.153:168): apparmor="AUDIT" operation="userns_create" class="namespace" info="Userns create - transitioning profile" profile="unconfined" pid=5630 comm="unshare" requested="userns_create" target="unprivileged_userns" execpath="/usr/bin/unshare"', 'userns_change_profile'),
|
||||||
|
('[ 52.901383] audit: type=1400 audit(1752064882.228:82): apparmor="DENIED" operation="capable" class="cap" profile="unprivileged_userns" pid=6700 comm="electron" capability=21 capname="sys_admin"', 'userns_capable'),
|
||||||
|
('Jul 31 17:11:16 dbusdev-saucy-amd64 dbus[1692]: apparmor="DENIED" operation="dbus_bind" bus="session" name="com.apparmor.Test" mask="bind" pid=2940 profile="/tmp/apparmor-2.8.0/tests/regression/apparmor/dbus_service"', 'normal'),
|
||||||
|
('[103975.623545] audit: type=1400 audit(1481284511.494:2807): apparmor="DENIED" operation="change_onexec" info="no new privs" error=-1 namespace="root//lxd-tor_<var-lib-lxd>" profile="unconfined" name="system_tor" pid=18593 comm="(tor)" target="system_tor"', 'userns_change_profile'),
|
||||||
|
('[78661.551820] audit: type=1400 audit(1752661047.170:350): apparmor="DENIED" operation="capable" class="cap" profile="unpriv_bwrap" pid=1412550 comm="node" capability=21 capname="sys_admin"', 'userns_capable'),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _run_test(self, ev, expected):
|
||||||
|
parsed_event = self.parser.parse_event(ev)
|
||||||
|
r = self.parser.create_rule_from_ev(parsed_event)
|
||||||
|
self.assertIsNotNone(r)
|
||||||
|
|
||||||
|
real_type = get_event_special_type(parsed_event, self.userns_special_profiles)
|
||||||
|
self.assertEqual(expected, real_type,
|
||||||
|
"ev {}: {} != {}".format(ev, expected, real_type))
|
||||||
|
|
||||||
|
def test_invalid(self):
|
||||||
|
ev = 'type=AVC msg=audit(1333698107.128:273917): apparmor="DENIED" operation="recvmsg" parent=1596 profile="unprivileged_userns" pid=1875 comm="nc" laddr=::ffff:127.0.0.1 lport=2048 faddr=::ffff:127.0.0.1 fport=59180 family="inet6" sock_type="stream" protocol=6'
|
||||||
|
parsed_event = self.parser.parse_event(ev)
|
||||||
|
parsed_event['comm'] = 'something' # Artificially crafted invalid event
|
||||||
|
with self.assertRaises(AppArmorBug):
|
||||||
|
get_event_special_type(parsed_event, self.userns_special_profiles)
|
||||||
|
|
||||||
|
|
||||||
setup_all_loops(__name__)
|
setup_all_loops(__name__)
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main(verbosity=1)
|
unittest.main(verbosity=1)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user