mirror of
https://gitlab.com/apparmor/apparmor
synced 2025-09-04 16:25:10 +00:00
utils: add userns python tool support
Signed-off-by: Georgia Garcia <georgia.garcia@canonical.com>
This commit is contained in:
@@ -50,6 +50,7 @@ from apparmor.rule.include import IncludeRule
|
||||
from apparmor.rule.network import NetworkRule
|
||||
from apparmor.rule.ptrace import PtraceRule
|
||||
from apparmor.rule.signal import SignalRule
|
||||
from apparmor.rule.userns import UserNamespaceRule
|
||||
from apparmor.translations import init_translation
|
||||
|
||||
_ = init_translation()
|
||||
@@ -1721,6 +1722,12 @@ def collapse_log(hashlog, ignore_null_profiles=True):
|
||||
if not hat_exists or not is_known_rule(aa[profile][hat], 'signal', signal_event):
|
||||
log_dict[aamode][full_profile]['signal'].add(signal_event)
|
||||
|
||||
userns = hashlog[aamode][full_profile]['userns']
|
||||
for access in userns.keys():
|
||||
userns_event = UserNamespaceRule(access)
|
||||
if not hat_exists or not is_known_rule(aa[profile][hat], 'userns', userns_event):
|
||||
log_dict[aamode][full_profile]['userns'].add(userns_event)
|
||||
|
||||
return log_dict
|
||||
|
||||
|
||||
@@ -2096,6 +2103,7 @@ def match_line_against_rule_classes(line, profile, file, lineno, in_preamble):
|
||||
'ptrace',
|
||||
'rlimit',
|
||||
'signal',
|
||||
'userns',
|
||||
):
|
||||
|
||||
if rule_name in ruletypes:
|
||||
|
@@ -57,6 +57,7 @@ class ReadLog:
|
||||
'path': hasher(),
|
||||
'ptrace': hasher(),
|
||||
'signal': hasher(),
|
||||
'userns': hasher(),
|
||||
}
|
||||
|
||||
def prefetch_next_log_entry(self):
|
||||
@@ -184,6 +185,11 @@ class ReadLog:
|
||||
self.hashlog[aamode][full_profile]['exec'][e['name']][e['name2']] = True
|
||||
return
|
||||
|
||||
elif e['class'] and e['class'] == 'namespace':
|
||||
if e['denied_mask'].startswith('userns'):
|
||||
self.hashlog[aamode][full_profile]['userns'][e['denied_mask'].removeprefix('userns_')] = True
|
||||
return None
|
||||
|
||||
elif self.op_type(e) == 'file':
|
||||
# Map c (create) and d (delete) to w (logging is more detailed than the profile language)
|
||||
dmask = e['denied_mask']
|
||||
|
@@ -27,6 +27,7 @@ from apparmor.rule.network import NetworkRule, NetworkRuleset
|
||||
from apparmor.rule.ptrace import PtraceRule, PtraceRuleset
|
||||
from apparmor.rule.rlimit import RlimitRule, RlimitRuleset
|
||||
from apparmor.rule.signal import SignalRule, SignalRuleset
|
||||
from apparmor.rule.userns import UserNamespaceRule, UserNamespaceRuleset
|
||||
from apparmor.translations import init_translation
|
||||
|
||||
_ = init_translation()
|
||||
@@ -42,6 +43,7 @@ ruletypes = {
|
||||
'ptrace': {'rule': PtraceRule, 'ruleset': PtraceRuleset},
|
||||
'rlimit': {'rule': RlimitRule, 'ruleset': RlimitRuleset},
|
||||
'signal': {'rule': SignalRule, 'ruleset': SignalRuleset},
|
||||
'userns': {'rule': UserNamespaceRule, 'ruleset': UserNamespaceRuleset},
|
||||
}
|
||||
|
||||
|
||||
@@ -191,6 +193,7 @@ class ProfileStorage:
|
||||
'unix',
|
||||
'file',
|
||||
'change_profile',
|
||||
'userns',
|
||||
]
|
||||
|
||||
data = []
|
||||
|
@@ -51,6 +51,7 @@ RE_PROFILE_SIGNAL = re.compile(RE_AUDIT_DENY + '(signal\s*,|signal(?P<details>\s
|
||||
RE_PROFILE_PTRACE = re.compile(RE_AUDIT_DENY + '(ptrace\s*,|ptrace(?P<details>\s+[^#]*)\s*,)' + RE_EOL)
|
||||
RE_PROFILE_PIVOT_ROOT = re.compile(RE_AUDIT_DENY + '(pivot_root\s*,|pivot_root\s+[^#]*\s*,)' + RE_EOL)
|
||||
RE_PROFILE_UNIX = re.compile(RE_AUDIT_DENY + '(unix\s*,|unix\s+[^#]*\s*,)' + RE_EOL)
|
||||
RE_PROFILE_USERNS = re.compile(RE_AUDIT_DENY + '(userns\s*,|userns(?P<details>\s+[^#]*)\s*,)' + RE_EOL)
|
||||
|
||||
# match anything that's not " or #, or matching quotes with anything except quotes inside
|
||||
__re_no_or_quoted_hash = '([^#"]|"[^"]*")*'
|
||||
|
135
utils/apparmor/rule/userns.py
Normal file
135
utils/apparmor/rule/userns.py
Normal file
@@ -0,0 +1,135 @@
|
||||
# ----------------------------------------------------------------------
|
||||
# Copyright (C) 2022 Canonical, Ltd.
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of version 2 of the GNU General Public
|
||||
# License as published by the Free Software Foundation.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
import re
|
||||
|
||||
from apparmor.regex import RE_PROFILE_USERNS
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, check_and_split_list, logprof_value_or_all, parse_modifiers
|
||||
|
||||
# setup module translations
|
||||
from apparmor.translations import init_translation
|
||||
_ = init_translation()
|
||||
|
||||
access_keyword = 'create'
|
||||
|
||||
RE_USERNS_DETAILS = re.compile(
|
||||
'^' +
|
||||
r'\s+(?P<access>' + access_keyword + ')?' + # optional access keyword
|
||||
r'\s*$')
|
||||
|
||||
|
||||
class UserNamespaceRule(BaseRule):
|
||||
'''Class to handle and store a single userns rule'''
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field UserNamespaceRule.ALL
|
||||
class __UserNamespaceAll(object):
|
||||
pass
|
||||
|
||||
ALL = __UserNamespaceAll
|
||||
|
||||
rule_name = 'userns'
|
||||
|
||||
def __init__(self, access, audit=False, deny=False,
|
||||
allow_keyword=False, comment='', log_event=None):
|
||||
|
||||
super(UserNamespaceRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
|
||||
self.access, self.all_access, unknown_items = check_and_split_list(access, access_keyword, UserNamespaceRule.ALL, 'UserNamespaceRule', 'access')
|
||||
if unknown_items:
|
||||
raise AppArmorException(_('Passed unknown access keyword to UserNamespaceRule: %s') % ' '.join(unknown_items))
|
||||
|
||||
@classmethod
|
||||
def _match(cls, raw_rule):
|
||||
return RE_PROFILE_USERNS.search(raw_rule)
|
||||
|
||||
@classmethod
|
||||
def _create_instance(cls, raw_rule):
|
||||
'''parse raw_rule and return UserNamespaceRule'''
|
||||
|
||||
matches = cls._match(raw_rule)
|
||||
if not matches:
|
||||
raise AppArmorException(_("Invalid userns rule '%s'") % raw_rule)
|
||||
|
||||
audit, deny, allow_keyword, comment = parse_modifiers(matches)
|
||||
|
||||
rule_details = ''
|
||||
if matches.group('details'):
|
||||
rule_details = matches.group('details')
|
||||
|
||||
if rule_details:
|
||||
details = RE_USERNS_DETAILS.search(rule_details)
|
||||
if not details:
|
||||
raise AppArmorException(_("Invalid or unknown keywords in 'userns %s" % rule_details))
|
||||
|
||||
access = details.group('access')
|
||||
else:
|
||||
access = UserNamespaceRule.ALL
|
||||
|
||||
return UserNamespaceRule(access, audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword, comment=comment)
|
||||
|
||||
def get_clean(self, depth=0):
|
||||
'''return rule (in clean/default formatting)'''
|
||||
|
||||
space = ' ' * depth
|
||||
|
||||
if self.all_access:
|
||||
access = ''
|
||||
elif self.access:
|
||||
access = ' %s' % ' '.join(self.access)
|
||||
else:
|
||||
raise AppArmorBug('Empty access in userns rule')
|
||||
|
||||
return('%s%suserns%s,%s' % (space, self.modifiers_str(), access, self.comment))
|
||||
|
||||
def is_covered_localvars(self, other_rule):
|
||||
'''check if other_rule is covered by this rule object'''
|
||||
|
||||
if not self._is_covered_list(self.access, self.all_access, other_rule.access, other_rule.all_access, 'access'):
|
||||
return False
|
||||
|
||||
# still here? -> then it is covered
|
||||
return True
|
||||
|
||||
def is_equal_localvars(self, rule_obj, strict):
|
||||
'''compare if rule-specific variables are equal'''
|
||||
|
||||
if not type(rule_obj) == UserNamespaceRule:
|
||||
raise AppArmorBug('Passed non-userns rule: %s' % str(rule_obj))
|
||||
|
||||
if (self.access != rule_obj.access or
|
||||
self.all_access != rule_obj.all_access):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def logprof_header_localvars(self):
|
||||
access = logprof_value_or_all(self.access, self.all_access)
|
||||
|
||||
return (
|
||||
_('Access mode'), access,
|
||||
)
|
||||
|
||||
|
||||
class UserNamespaceRuleset(BaseRuleset):
|
||||
'''Class to handle and store a collection of userns rules'''
|
||||
|
||||
def get_glob(self, path_or_rule):
|
||||
'''Return the next possible glob. For userns rules, that means removing access'''
|
||||
return 'userns,'
|
@@ -170,8 +170,6 @@ log_to_profile_skip = [
|
||||
|
||||
'testcase_changehat_01', # interactive, asks to add a hat
|
||||
'testcase_dbus_09', # multiline log not currently supported
|
||||
|
||||
'testcase_userns_01', # userns currently not supported
|
||||
]
|
||||
|
||||
# tests that cause an empty log
|
||||
|
@@ -298,14 +298,6 @@ unknown_line = (
|
||||
'bare_include_tests/ok_84.sd',
|
||||
'bare_include_tests/ok_85.sd',
|
||||
'bare_include_tests/ok_86.sd',
|
||||
|
||||
'namespaces/ok_userns_01.sd',
|
||||
'namespaces/ok_userns_02.sd',
|
||||
'namespaces/ok_userns_03.sd',
|
||||
'namespaces/ok_userns_04.sd',
|
||||
'namespaces/ok_userns_05.sd',
|
||||
'namespaces/ok_userns_06.sd',
|
||||
'namespaces/ok_userns_07.sd',
|
||||
)
|
||||
|
||||
# testcases with various unexpected failures
|
||||
|
152
utils/test/test-userns.py
Normal file
152
utils/test/test-userns.py
Normal file
@@ -0,0 +1,152 @@
|
||||
#!/usr/bin/python3
|
||||
# ----------------------------------------------------------------------
|
||||
# Copyright (C) 2022 Canonical, Ltd.
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of version 2 of the GNU General Public
|
||||
# License as published by the Free Software Foundation.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
import unittest
|
||||
from collections import namedtuple
|
||||
from common_test import AATest, setup_all_loops
|
||||
|
||||
from apparmor.rule.userns import UserNamespaceRule, UserNamespaceRuleset
|
||||
from apparmor.common import AppArmorException, AppArmorBug
|
||||
from apparmor.translations import init_translation
|
||||
_ = init_translation()
|
||||
|
||||
|
||||
class UserNamespaceTestParse(AATest):
|
||||
tests = (
|
||||
# access audit deny allow comment
|
||||
('userns,', UserNamespaceRule(UserNamespaceRule.ALL, False, False, False, '')),
|
||||
('userns create,', UserNamespaceRule(('create'), False, False, False, '')),
|
||||
('audit userns create,', UserNamespaceRule(('create'), True, False, False, '')),
|
||||
('deny userns,', UserNamespaceRule(UserNamespaceRule.ALL, False, True, False, '')),
|
||||
('audit allow userns,', UserNamespaceRule(UserNamespaceRule.ALL, True, False, True, '')),
|
||||
('userns create, # cmt', UserNamespaceRule(('create'), False, False, False, ' # cmt')),
|
||||
)
|
||||
|
||||
def _run_test(self, rawrule, expected):
|
||||
self.assertTrue(UserNamespaceRule.match(rawrule))
|
||||
obj = UserNamespaceRule.create_instance(rawrule)
|
||||
expected.raw_rule = rawrule.strip()
|
||||
self.assertTrue(obj.is_equal(expected, True))
|
||||
|
||||
|
||||
class UserNamespaceTestParseInvalid(AATest):
|
||||
tests = (
|
||||
('userns invalidaccess,', AppArmorException),
|
||||
)
|
||||
|
||||
def _run_test(self, rawrule, expected):
|
||||
self.assertTrue(UserNamespaceRule.match(rawrule)) # the above invalid rules still match the main regex!
|
||||
with self.assertRaises(expected):
|
||||
UserNamespaceRule.create_instance(rawrule)
|
||||
|
||||
def test_parse_fail(self):
|
||||
with self.assertRaises(AppArmorException):
|
||||
UserNamespaceRule.create_instance('foo,')
|
||||
|
||||
def test_diff_non_usernsrule(self):
|
||||
exp = namedtuple('exp', ('audit', 'deny'))
|
||||
obj = UserNamespaceRule(('create'))
|
||||
with self.assertRaises(AppArmorBug):
|
||||
obj.is_equal(exp(False, False), False)
|
||||
|
||||
def test_diff_access(self):
|
||||
obj1 = UserNamespaceRule(UserNamespaceRule.ALL)
|
||||
obj2 = UserNamespaceRule(('create'))
|
||||
self.assertFalse(obj1.is_equal(obj2, False))
|
||||
|
||||
|
||||
class InvalidUserNamespaceInit(AATest):
|
||||
tests = (
|
||||
# init params expected exception
|
||||
((''), TypeError), # empty access
|
||||
((' '), AppArmorBug), # whitespace access
|
||||
(('xyxy'), AppArmorException), # invalid access
|
||||
(dict(), TypeError), # wrong type for access
|
||||
(None, TypeError), # wrong type for access
|
||||
)
|
||||
|
||||
def _run_test(self, params, expected):
|
||||
with self.assertRaises(expected):
|
||||
UserNamespaceRule(*params)
|
||||
|
||||
def test_missing_params(self):
|
||||
with self.assertRaises(TypeError):
|
||||
UserNamespaceRule()
|
||||
|
||||
class WriteUserNamespaceTestAATest(AATest):
|
||||
tests = (
|
||||
# raw rule clean rule
|
||||
(' userns , # foo ', 'userns, # foo'),
|
||||
(' audit userns create,', 'audit userns create,'),
|
||||
(' deny userns ,# foo bar', 'deny userns, # foo bar'),
|
||||
(' allow userns create ,# foo bar', 'allow userns create, # foo bar'),
|
||||
('userns,', 'userns,'),
|
||||
('userns create,', 'userns create,'),
|
||||
)
|
||||
|
||||
def _run_test(self, rawrule, expected):
|
||||
self.assertTrue(UserNamespaceRule.match(rawrule))
|
||||
obj = UserNamespaceRule.create_instance(rawrule)
|
||||
clean = obj.get_clean()
|
||||
raw = obj.get_raw()
|
||||
|
||||
self.assertEqual(expected.strip(), clean, 'unexpected clean rule')
|
||||
self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule')
|
||||
|
||||
def test_write_manually(self):
|
||||
obj = UserNamespaceRule('create', allow_keyword=True)
|
||||
|
||||
expected = ' allow userns create,'
|
||||
|
||||
self.assertEqual(expected, obj.get_clean(2), 'unexpected clean rule')
|
||||
self.assertEqual(expected, obj.get_raw(2), 'unexpected raw rule')
|
||||
|
||||
def test_write_invalid_access(self):
|
||||
obj = UserNamespaceRule('create')
|
||||
obj.access = ''
|
||||
with self.assertRaises(AppArmorBug):
|
||||
obj.get_clean()
|
||||
|
||||
|
||||
class UserNamespaceIsCoveredTest(AATest):
|
||||
def test_is_covered(self):
|
||||
obj = UserNamespaceRule(UserNamespaceRule.ALL)
|
||||
self.assertTrue(obj.is_covered(UserNamespaceRule(('create'))))
|
||||
self.assertTrue(obj.is_covered(UserNamespaceRule(UserNamespaceRule.ALL)))
|
||||
|
||||
def test_is_not_covered(self):
|
||||
obj = UserNamespaceRule(('create'))
|
||||
self.assertFalse(obj.is_covered(UserNamespaceRule(UserNamespaceRule.ALL)))
|
||||
|
||||
|
||||
class UserNamespaceLogprofHeaderTest(AATest):
|
||||
tests = (
|
||||
('userns,', [_('Access mode'), _('ALL')]),
|
||||
('userns create,', [_('Access mode'), 'create']),
|
||||
)
|
||||
|
||||
def _run_test(self, params, expected):
|
||||
obj = UserNamespaceRule.create_instance(params)
|
||||
self.assertEqual(obj.logprof_header(), expected)
|
||||
|
||||
|
||||
class UserNamespaceGlobTestAATest(AATest):
|
||||
def test_glob(self):
|
||||
self.assertEqual(UserNamespaceRuleset().get_glob('userns create,'), 'userns,')
|
||||
|
||||
|
||||
setup_all_loops(__name__)
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=1)
|
Reference in New Issue
Block a user