2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-22 01:57:43 +00:00

Create get_event_type instead of customized_message['userns']['cond']

This improves the code readability

Signed-off-by: Maxime Bélair <maxime.belair@canonical.com>
This commit is contained in:
Maxime Bélair 2025-07-09 11:02:29 +02:00 committed by John Johansen
parent 84fbd87334
commit 71a71e0fa7
3 changed files with 70 additions and 38 deletions

View File

@ -53,7 +53,7 @@ import apparmor.update_profile as update_profile
import LibAppArmor # C-library to parse one log line
from apparmor.common import DebugLogger, open_file_read
from apparmor.fail import enable_aa_exception_handler
from apparmor.notify import get_last_login_timestamp
from apparmor.notify import get_last_login_timestamp, get_event_special_type
from apparmor.translations import init_translation
from apparmor.logparser import ReadLog
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme, ProfileRules
@ -66,6 +66,9 @@ import threading
gi.require_version('GLib', '2.0')
# setup module translations
_ = init_translation()
def get_user_login():
"""Portable function to get username.
@ -449,22 +452,15 @@ def compile_filter_regex(filters):
def can_allow_rule(ev, special_profiles):
if customized_message['userns']['cond'](ev, special_profiles):
ev_type = get_event_special_type(ev, special_profiles)
if ev_type != 'normal':
if ev['execpath'] is None:
return False
return not aa.get_profile_filename_from_profile_name(ev['comm'])
else:
return aa.get_profile_filename_from_profile_name(ev['profile']) is not None
def is_special_profile_userns(ev, special_profiles):
if not special_profiles or ev['profile'] not in special_profiles:
return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns
if 'execpath' not in ev or not ev['execpath']:
ev['execpath'] = aa.find_executable(ev['comm'])
return True
def create_userns_profile(name, path, ans):
update_profile_path = update_profile.__file__
@ -544,7 +540,10 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='')
if value:
out += '\t{} = {}\n'.format(_(key), value)
out += _('\nThe software that declined this operation is {}\n').format(ev['profile'])
if ev['aamode'] == 'REJECTING':
out += _('\nThe profile that denied this operation is {}\n').format(ev['profile'])
else:
out += _('\nThe profile that triggered this alert is {}\n').format(ev['profile'])
rule = rl.create_rule_from_ev(ev)
@ -552,15 +551,15 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='')
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
rule.exec_perms = 'Pix'
aa.update_profiles()
if customized_message['userns']['cond'](ev, special_profiles):
out += _('You may allow it through a dedicated unconfined profile for {}.').format(ev['comm'])
if get_event_special_type(ev, special_profiles) != 'normal':
userns_event_usable = can_leverage_userns_event(ev)
if userns_event_usable == 'error_cannot_find_path':
raw_rule = _('# You may allow it through a dedicated unconfined profile for {0}. However, apparmor cannot find {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm'])
raw_rule = _('# You may allow it through a dedicated unconfined profile for {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm'])
elif userns_event_usable == 'error_userns_profile_exists':
raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath'])
elif userns_event_usable == 'ok':
raw_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath'])
out += raw_rule[1:]
else:
raw_rule = rule.get_clean()
if profile_path:
@ -570,7 +569,6 @@ def get_more_info_about_event(rl, ev, special_profiles, profile_path, header='')
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir)
else: # Should not happen
out += _('ERROR: Could not create rule from event.')
return out, raw_rule
@ -589,7 +587,6 @@ def cb_more_info(notification, action, _args):
if ans == 'add_rule':
add_to_profile(raw_rule, ev['profile'])
elif ans in {'allow', 'deny'}:
customized_message['userns']['cond'](ev, special_profiles)
create_userns_profile(ev['comm'], ev['execpath'], ans)
@ -671,24 +668,29 @@ def cb_add_to_profile(notification, action, _args):
aa.update_profiles()
if customized_message['userns']['cond'](ev, special_profiles):
if get_event_special_type(ev, special_profiles) != 'normal':
ask_for_user_ns_denied(ev['execpath'], ev['comm'], False)
else:
add_to_profile(rule.get_clean(), ev['profile'])
customized_message = {
'userns': {
'cond': lambda ev, special_profiles: (ev['operation'] == 'userns_create' or ev['operation'] == 'capable') and is_special_profile_userns(ev, special_profiles),
'msg': 'Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?'
'userns_change_profile': {
'msg': _('Application {0} is transited to special profile. Capabilities could be denied')
},
'userns_denied': {
'msg': _('Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?')
},
'userns_capable': {
'msg': _('Application {0} in special profile wanted to add a capability: ok?')
}
}
def customize_notification_message(ev, msg, special_profiles):
if customized_message['userns']['cond'](ev, special_profiles):
msg = _(customized_message['userns']['msg']).format(ev['comm'])
msg_type = get_event_special_type(ev, special_profiles)
if msg_type in customized_message:
msg = customized_message[msg_type]['msg'].format(ev['comm'])
return msg
@ -712,7 +714,6 @@ def aggregate_event(agg, ev, keys_to_aggregate):
def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles):
notification = ''
summary = ''
more_info = ''
clean_rules = dict()
summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys())))
@ -740,10 +741,11 @@ def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles
ev = data['events'][0]
profile_name = ev['profile']
profile_path = aa.get_profile_filename_from_profile_name(profile_name)
is_userns_profile = customized_message['userns']['cond'](ev, special_profiles)
is_userns_profile = get_event_special_type(ev, special_profiles) != 'normal'
if is_userns_profile:
bin_name = ev['comm']
if 'execpath' not in ev:
ev['execpath'] = None
bin_path = ev['execpath']
actionable = can_leverage_userns_event(ev) == 'ok'
else:
@ -761,7 +763,7 @@ def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles
rules_for_profiles.add(raw_rule)
if rules_for_profiles != set():
if profile not in special_profiles:
if not is_userns_profile:
if profile_path is not None:
clean_rules_name = _('profile {}:').format(profile)
elif re_snap.match(profile):
@ -819,9 +821,6 @@ def main():
# setup exception handling
enable_aa_exception_handler()
# setup module translations
_ = init_translation()
# Register the on_exit method with atexit
# Takes care of closing the debug log etc
atexit.register(aa.on_exit)
@ -1156,10 +1155,14 @@ def main():
if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability:
continue
# Special behaivor for userns:
if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles):
prompt_userns(ev)
continue # Notification already displayed for this event, we go to the next one.
# Special behavior for userns:
if get_event_special_type(ev, userns_special_profiles) != 'normal':
if 'execpath' not in ev:
ev['execpath'] = None
if args.prompt_filter and 'userns' in args.prompt_filter:
prompt_userns(ev)
continue # Notification already displayed for this event, we go to the next one.
# Notifications should not be run as root, since root probably is
# the wrong desktop user and not the one getting the notifications.

View File

@ -113,7 +113,6 @@ class ReadLog:
return log_entry
def get_event_type(self, e):
if e['operation'] == 'exec':
return 'file'
elif e['class'] and e['class'] == 'namespace':
@ -131,6 +130,8 @@ class ReadLog:
return 'pivot_root'
elif e['class'] and e['class'] == 'net' and e['family'] and e['family'] == 'unix':
return 'unix'
elif e['operation'] == 'change_onexec':
return 'change_profile'
elif e['class'] == 'file' or self.op_type(e) == 'file':
return 'file'
elif e['operation'] == 'capable':
@ -160,6 +161,8 @@ class ReadLog:
return None
def create_rule_from_ev(self, ev):
if not ev:
return None
event_type = self.get_event_type(ev)
if not event_type:
return None
@ -244,7 +247,7 @@ class ReadLog:
elif event_type == 'io_uring':
ev['peer_profile'] = event.peer_profile
elif event_type == 'capability':
elif event_type == 'capability' or ev['operation'] == 'change_onexec':
ev['comm'] = event.comm
if not ev['time']:

View File

@ -129,3 +129,29 @@ def get_last_login_timestamp_wtmp(username, filename='/var/log/wtmp'):
# When loop is done, last value should be the latest login timestamp
return last_login
def is_special_profile_userns(ev, special_profiles):
if 'comm' not in ev:
return False # special profiles have a 'comm' entry
if not special_profiles or not any(p.match(ev['profile']) for p in special_profiles):
return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns
return True
def get_event_special_type(ev, special_profiles):
if is_special_profile_userns(ev, special_profiles):
if ev['operation'] == 'userns_create':
if ev['aamode'] == 'REJECTING':
return 'userns_denied'
else:
return 'userns_change_profile'
elif ev['operation'] == 'change_onexec':
return 'userns_change_profile'
elif ev['operation'] == 'capable':
return 'userns_capable'
else:
raise AppArmorBug('unexpected operation: %s' % ev['operation'])
return 'normal'