2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-22 10:07:12 +00:00
2024-03-29 13:09:06 +00:00

241 lines
9.0 KiB
Python

# ----------------------------------------------------------------------
# Copyright (C) 2024 Canonical, Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# ----------------------------------------------------------------------
import re
from apparmor.common import AppArmorException
from apparmor.regex import RE_PROFILE_UNIX, strip_parenthesis
from apparmor.rule import AARE
from apparmor.rule import BaseRule, BaseRuleset, parse_modifiers, logprof_value_or_all, check_and_split_list
from apparmor.translations import init_translation
_ = init_translation()
_aare = r'([][!/\\\,().*@{}\w^-]+)'
_quoted_aare = r'"([][!/\\\,().*@{}\w\s^-]+)"'
aare = rf'({_aare}|{_quoted_aare}|\(({_aare}|{_quoted_aare})\))'
aare_set = rf'({_aare}|{_quoted_aare}|\(({_aare}|{_quoted_aare})+\))'
def re_cond_set(x, y=None):
return rf'\s*({x}\s*=\s*(?P<{y or x}_cond_set>{aare_set}))\s*'
def re_cond(x, y=None):
return rf'\s*({x}\s*=\s*(?P<{y or x}_cond>{aare}))\s*'
access_flags = [
'create', 'bind', 'listen', 'accept', 'connect', 'shutdown', 'getattr', 'setattr', 'getopt', 'setopt', 'send',
'receive', 'r', 'w', 'rw'
]
join_access = r'(\s*(' + '|'.join(access_flags) + '))'
sep = r'\s*[\s,]\s*'
unix_accesses = rf'\s*(\s*(?P<accesses>\({join_access}({sep}{join_access})*\s*\)|{join_access}))?'
unix_rule_conds = rf'(\s*({re_cond_set("type")}|{re_cond_set("protocol")}))*'
unix_local_expr = rf'(\s*({re_cond("addr")}|{re_cond("label")}|{re_cond("attr")}|{re_cond("opt")}))*'
unix_peer_expr = rf'peer\s*=\s*\((\s*({re_cond("addr", "addr_peer")}|{re_cond("label", "label_peer")}))*\)'
RE_UNIX_DETAILS = re.compile(rf'^(\s*{unix_accesses})?(\s*{unix_rule_conds})?(\s*{unix_local_expr})?(\s*{unix_peer_expr})?\s*$')
class UnixRule(BaseRule):
'''Class to handle and store a single unix rule'''
# Nothing external should reference this class, all external users
# should reference the class field UnixRule.ALL
class __UnixAll(object):
pass
ALL = __UnixAll
rule_name = 'unix'
_match_re = RE_PROFILE_UNIX
def __init__(self, accesses, rule_conds, local_expr, peer_expr, audit=False, deny=False, allow_keyword=False, comment='', log_event=None):
super().__init__(audit=audit, deny=deny,
allow_keyword=allow_keyword,
comment=comment,
log_event=log_event)
if type(rule_conds) is tuple: # This comes from the logparser, we convert it to dicts
accesses = strip_parenthesis(accesses).replace(',', ' ').split()
rule_conds = _tuple_to_dict(rule_conds, ['type', 'protocol'])
local_expr = _tuple_to_dict(local_expr, ['addr', 'label', 'attr', 'opt'])
peer_expr = _tuple_to_dict(peer_expr, ['addr', 'label'])
self.accesses, self.all_accesses, unknown_items = check_and_split_list(accesses, access_flags, self.ALL, type(self).__name__, 'accesses')
if unknown_items:
raise AppArmorException(f'Invalid access in Unix rule: {unknown_items}')
self.rule_conds = _check_dict_keys(rule_conds, {'type', 'protocol'})
self.local_expr = _check_dict_keys(local_expr, {'addr', 'label', 'attr', 'opt'})
self.peer_expr = _check_dict_keys(peer_expr, {'addr', 'label'})
if not self.all_accesses and self.peer_expr != self.ALL and self.accesses & {'create', 'bind', 'listen', 'shutdown', 'getattr', 'setattr', 'getopt', 'setopt'}:
raise AppArmorException('Cannot use a peer_expr and an access in {create, bind, listen, shutdown, getattr, setattr, getopt, setopt} simultaneously')
self.can_glob = not (self.accesses or self.rule_conds or self.local_expr or self.peer_expr)
@classmethod
def _create_instance(cls, raw_rule, matches):
'''parse raw_rule and return instance of this class'''
audit, deny, allow_keyword, comment = parse_modifiers(matches)
rule_details = ''
if matches.group('details'):
rule_details = matches.group('details')
parsed = RE_UNIX_DETAILS.search(rule_details)
if not parsed:
raise AppArmorException('Cannot parse unix rule ' + raw_rule)
r = parsed.groupdict()
if r['accesses']:
accesses = strip_parenthesis(r['accesses']).replace(',', ' ').split()
else:
accesses = cls.ALL
rule_conds = _initialize_cond_dict(r, ['type', 'protocol'], '_cond_set')
local_expr = _initialize_cond_dict(r, ['addr', 'label', 'attr', 'opt'], '_cond')
peer_expr = _initialize_cond_dict(r, ['addr', 'label'], '_peer_cond')
else:
accesses = cls.ALL
rule_conds = cls.ALL
local_expr = cls.ALL
peer_expr = cls.ALL
return cls(accesses=accesses, rule_conds=rule_conds, local_expr=local_expr, peer_expr=peer_expr, audit=audit, deny=deny, allow_keyword=allow_keyword, comment=comment)
def get_clean(self, depth=0):
space = ' ' * depth
accesses = ' (%s)' % (', '.join(sorted(self.accesses))) if not self.all_accesses else ''
rule_conds = _print_dict_values(self.rule_conds)
local_expr = _print_dict_values(self.local_expr)
peer_expr = _print_dict_values(self.peer_expr, 'peer')
return f'{space}unix{self.modifiers_str()}{accesses}{rule_conds}{local_expr}{peer_expr},{self.comment}'
def _is_covered_localvars(self, other_rule):
if not self._is_covered_list(self.accesses, self.all_accesses, other_rule.accesses, other_rule.all_accesses, 'accesses'):
return False
if not self._is_covered_dict(self.rule_conds, other_rule.rule_conds):
return False
if not self._is_covered_dict(self.local_expr, other_rule.local_expr):
return False
if not self._is_covered_dict(self.peer_expr, other_rule.peer_expr):
return False
return True
def _is_equal_localvars(self, rule_obj, strict):
if self.accesses != rule_obj.accesses:
return False
if self.rule_conds != rule_obj.rule_conds:
return False
if self.local_expr != rule_obj.local_expr:
return False
if self.peer_expr != rule_obj.peer_expr:
return False
return True
def glob(self):
'''Change path to next possible glob'''
if self.peer_expr != self.ALL:
self.peer_expr = self.ALL
elif self.local_expr != self.ALL:
self.local_expr = self.ALL
elif self.rule_conds != self.ALL:
self.rule_conds = self.ALL
else: # not self.all_accesses:
self.accesses = None
self.all_accesses = True
self.raw_rule = None
def _logprof_header_localvars(self):
accesses = logprof_value_or_all(self.accesses, self.all_accesses)
rule_conds = logprof_value_or_all(self.rule_conds, self.rule_conds == UnixRule.ALL)
local_expr = logprof_value_or_all(self.local_expr, self.local_expr == UnixRule.ALL)
peer_expr = logprof_value_or_all(self.peer_expr, self.peer_expr == UnixRule.ALL)
return (
_('Accesses'), accesses,
_('Rule'), rule_conds,
_('Local'), local_expr,
_('Peer'), peer_expr,
)
def _is_covered_dict(self, d, other):
if d is self.ALL:
return True
elif other is self.ALL:
return False
for it in other:
if it not in d:
continue # No constraints on this item.
else:
if not self._is_covered_aare(AARE(d[it], False), False, AARE(other[it], False), False, it):
return False
return True
def _print_dict_values(d, prefix=None):
if d == UnixRule.ALL:
return ''
to_print = ' '.join(f'{k}={v}' for k, v in d.items())
if prefix:
return f' {prefix}=({to_print})'
else:
return f' {to_print}'
def _initialize_cond_dict(d, keys, suffix):
out = {
key: d[f'{key}{suffix}']
for key in keys
if f'{key}{suffix}' in d and d[f'{key}{suffix}'] is not None
}
return out if out != {} else UnixRule.ALL
def _check_dict_keys(d, possible_keys):
if d == UnixRule.ALL or d == {}:
return UnixRule.ALL
if not possible_keys >= d.keys():
raise AppArmorException(f'Incorrect key in dict {d}. Possible keys are {possible_keys},')
return d
def _tuple_to_dict(t, keys):
d = {}
for idx, k in enumerate(keys):
if t[idx] is not None:
d[k] = t[idx]
return d
class UnixRuleset(BaseRuleset):
'''Class to handle and store a collection of Unix rules'''