2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-30 22:05:27 +00:00

Merge utils: Simplify logparsing and rule creation from hashlog/event

utils: Simplify logparsing and rule creation from hashlog/event

- Allows to create all rules classes thanks to from_hashlog and hashlog_from_event
- These new functions simplify event/log parsing in logparser.py and aa.py

Signed-off-by: Maxime Bélair <maxime.belair@canonical.com>

MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/1276
Approved-by: Christian Boltz <apparmor@cboltz.de>
Merged-by: Christian Boltz <apparmor@cboltz.de>
This commit is contained in:
Christian Boltz
2024-07-23 16:09:53 +00:00
27 changed files with 523 additions and 247 deletions

View File

@@ -0,0 +1 @@
[ 429.272003] audit: type=1400 audit(1720613712.153:168): apparmor="AUDIT" operation="userns_create" class="namespace" info="Userns create - transitioning profile" profile="unconfined" pid=5630 comm="unshare" requested="userns_create" target="unprivileged_userns" execpath="/usr/bin/unshare"

View File

@@ -0,0 +1,15 @@
START
File: testcase_userns_02.in
Event type: AA_RECORD_AUDIT
Audit ID: 1720613712.153:168
Operation: userns_create
Mask: userns_create
Profile: unconfined
Command: unshare
Name2: unprivileged_userns
Info: Userns create - transitioning profile
PID: 5630
Execpath: /usr/bin/unshare
Class: namespace
Epoch: 1720613712
Audit subid: 168

View File

@@ -0,0 +1,4 @@
/usr/bin/unshare {
audit userns create,
}

View File

@@ -40,20 +40,9 @@ from apparmor.regex import (
RE_PROFILE_HAT_DEF, RE_PROFILE_START,
RE_RULE_HAS_COMMA, parse_profile_start_line, re_match_include)
from apparmor.rule.abi import AbiRule
from apparmor.rule.capability import CapabilityRule
from apparmor.rule.change_profile import ChangeProfileRule
from apparmor.rule.dbus import DbusRule
from apparmor.rule.file import FileRule
from apparmor.rule.include import IncludeRule
from apparmor.rule.network import NetworkRule
from apparmor.rule.pivot_root import PivotRootRule
from apparmor.rule.ptrace import PtraceRule
from apparmor.rule.signal import SignalRule
from apparmor.rule.userns import UserNamespaceRule
from apparmor.rule.mqueue import MessageQueueRule
from apparmor.rule.io_uring import IOUringRule
from apparmor.rule.mount import MountRule
from apparmor.rule.unix import UnixRule
from apparmor.logparser import ReadLog
from apparmor.translations import init_translation
_ = init_translation()
@@ -1610,137 +1599,14 @@ def collapse_log(hashlog, ignore_null_profiles=True):
# with execs in ix mode, we already have ProfileStorage initialized and should keep the content it already has
log_dict[aamode][final_name] = ProfileStorage(profile, hat, 'collapse_log()')
for path in hashlog[aamode][full_profile]['path'].keys():
for owner in hashlog[aamode][full_profile]['path'][path]:
mode = set(hashlog[aamode][full_profile]['path'][path][owner].keys())
# logparser sums up multiple log events, so both 'a' and 'w' can be present
if 'a' in mode and 'w' in mode:
mode.remove('a')
file_event = FileRule(path, mode, None, FileRule.ALL, owner=owner, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'file', file_event):
log_dict[aamode][final_name]['file'].add(file_event)
# TODO: check for existing rules with this path, and merge them into one rule
for cap in hashlog[aamode][full_profile]['capability'].keys():
cap_event = CapabilityRule(cap, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'capability', cap_event):
log_dict[aamode][final_name]['capability'].add(cap_event)
for cp in hashlog[aamode][full_profile]['change_profile'].keys():
cp_event = ChangeProfileRule(None, ChangeProfileRule.ALL, cp, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'change_profile', cp_event):
log_dict[aamode][final_name]['change_profile'].add(cp_event)
dbus = hashlog[aamode][full_profile]['dbus']
for access in dbus: # noqa: E271
for bus in dbus[access]: # noqa: E271
for path in dbus[access][bus]: # noqa: E271
for name in dbus[access][bus][path]: # noqa: E271
for interface in dbus[access][bus][path][name]: # noqa: E271
for member in dbus[access][bus][path][name][interface]: # noqa: E271
for peer_profile in dbus[access][bus][path][name][interface][member]:
# Depending on the access type, not all parameters are allowed.
# Ignore them, even if some of them appear in the log.
# Also, the log doesn't provide a peer name, therefore always use ALL.
if access in ('send', 'receive'):
dbus_event = DbusRule(access, bus, path, DbusRule.ALL, interface, member, DbusRule.ALL, peer_profile, log_event=True)
elif access == 'bind':
dbus_event = DbusRule(access, bus, DbusRule.ALL, name, DbusRule.ALL, DbusRule.ALL, DbusRule.ALL, DbusRule.ALL, log_event=True)
elif access == 'eavesdrop':
dbus_event = DbusRule(access, bus, DbusRule.ALL, DbusRule.ALL, DbusRule.ALL, DbusRule.ALL, DbusRule.ALL, DbusRule.ALL, log_event=True)
else:
raise AppArmorBug('unexpected dbus access: {}'.format(access))
if not hat_exists or not is_known_rule(aa[profile][hat], 'dbus', dbus_event):
log_dict[aamode][final_name]['dbus'].add(dbus_event)
nd = hashlog[aamode][full_profile]['network']
for access in nd.keys():
for family in nd[access].keys():
for sock_type in nd[access][family].keys():
for protocol in nd[access][family][sock_type].keys():
for local_event in nd[access][family][sock_type][protocol].keys():
for peer_event in nd[access][family][sock_type][protocol][local_event].keys():
net_event = NetworkRule(access, family, sock_type, local_event, peer_event, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'network', net_event):
log_dict[aamode][final_name]['network'].add(net_event)
ptrace = hashlog[aamode][full_profile]['ptrace']
for peer in ptrace.keys():
if '//null-' in peer:
continue # ignore null-* peers
for access in ptrace[peer].keys():
ptrace_event = PtraceRule(access, peer, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'ptrace', ptrace_event):
log_dict[aamode][final_name]['ptrace'].add(ptrace_event)
sig = hashlog[aamode][full_profile]['signal']
for peer in sig.keys():
if '//null-' in peer:
continue # ignore null-* peers
for access in sig[peer].keys():
for signal in sig[peer][access].keys():
signal_event = SignalRule(access, signal, peer, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'signal', signal_event):
log_dict[aamode][final_name]['signal'].add(signal_event)
userns = hashlog[aamode][full_profile]['userns']
for access in userns.keys():
userns_event = UserNamespaceRule(access)
if not hat_exists or not is_known_rule(aa[profile][hat], 'userns', userns_event):
log_dict[aamode][final_name]['userns'].add(userns_event)
mqueue = hashlog[aamode][full_profile]['mqueue']
for access in mqueue.keys():
for mqueue_type in mqueue[access]:
for mqueue_name in mqueue[access][mqueue_type]:
mqueue_event = MessageQueueRule(access, mqueue_type, MessageQueueRule.ALL, mqueue_name, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'mqueue', mqueue_event):
log_dict[aamode][final_name]['mqueue'].add(mqueue_event)
io_uring = hashlog[aamode][full_profile]['io_uring']
for access in io_uring.keys():
for label in io_uring[access]:
if not label:
label = IOUringRule.ALL
io_uring_event = IOUringRule(access, label, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'io_uring', io_uring_event):
log_dict[aamode][final_name]['io_uring'].add(io_uring_event)
unix = hashlog[aamode][full_profile]['unix']
for unix_access in unix.keys():
for unix_rule in unix[unix_access]:
for unix_local in unix[unix_access][unix_rule]:
for unix_peer in unix[unix_access][unix_rule][unix_local]:
unix_event = UnixRule(unix_access, unix_rule, unix_local, unix_peer)
if not hat_exists or not is_known_rule(aa[profile][hat], 'unix', unix_event):
log_dict[aamode][final_name]['unix'].add(unix_event)
mount = hashlog[aamode][full_profile]['mount']
for operation, operation_val in mount.items():
for options, options_val in operation_val.items():
for fstype, fstype_value in options_val.items():
for dest, dest_value in fstype_value.items():
for source, source_value in dest_value.items():
_options = (options[0], options[1].split(', ')) if options is not None else MountRule.ALL
_fstype = (fstype[0], fstype[1].split(', ')) if fstype is not None else MountRule.ALL
_source = source if source is not None else MountRule.ALL
_dest = dest if dest is not None else MountRule.ALL
mount_event = MountRule(operation=operation, fstype=_fstype, options=_options, source=_source, dest=_dest)
if not hat_exists or not is_known_rule(aa[profile][hat], 'mount', mount_event):
log_dict[aamode][final_name]['mount'].add(mount_event)
pivot_root = hashlog[aamode][full_profile]['pivot_root']
for oldroot in pivot_root.keys():
for newroot in pivot_root[oldroot]:
pivot_root_event = PivotRootRule(oldroot, newroot, PivotRootRule.ALL, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'pivot_root', pivot_root_event):
log_dict[aamode][final_name]['pivot_root'].add(pivot_root_event)
for ev_type, ev_class in ReadLog.ruletypes.items():
if ev_class == FileRule: # TODO: fix the name in the hashlog for FileRule
ev_type_hashlog = 'path'
else:
ev_type_hashlog = ev_type
for event in ev_class.from_hashlog(hashlog[aamode][full_profile][ev_type_hashlog]):
if not hat_exists or not is_known_rule(aa[profile][hat], ev_type, event):
log_dict[aamode][final_name][ev_type].add(event)
return log_dict

View File

@@ -19,6 +19,19 @@ import time
import LibAppArmor
from apparmor.common import AppArmorBug, AppArmorException, DebugLogger, hasher, open_file_read, split_name
from apparmor.rule.capability import CapabilityRule
from apparmor.rule.change_profile import ChangeProfileRule
from apparmor.rule.dbus import DbusRule
from apparmor.rule.file import FileRule
from apparmor.rule.io_uring import IOUringRule
from apparmor.rule.mount import MountRule
from apparmor.rule.mqueue import MessageQueueRule
from apparmor.rule.network import NetworkRule
from apparmor.rule.pivot_root import PivotRootRule
from apparmor.rule.ptrace import PtraceRule
from apparmor.rule.signal import SignalRule
from apparmor.rule.unix import UnixRule
from apparmor.rule.userns import UserNamespaceRule
from apparmor.translations import init_translation
_ = init_translation()
@@ -29,6 +42,22 @@ class ReadLog:
# used to pre-filter log lines so that we hand over only relevant lines to LibAppArmor parsing
RE_LOG_ALL = re.compile('apparmor=|operation=|type=AVC')
ruletypes = {
'capability': CapabilityRule,
'change_profile': ChangeProfileRule,
'dbus': DbusRule,
'file': FileRule,
'ptrace': PtraceRule,
'signal': SignalRule,
'userns': UserNamespaceRule,
'mqueue': MessageQueueRule,
'io_uring': IOUringRule,
'mount': MountRule,
'unix': UnixRule,
'network': NetworkRule,
'pivot_root': PivotRootRule,
}
def __init__(self, filename, active_profiles, profile_dir):
self.filename = filename
self.profile_dir = profile_dir
@@ -82,11 +111,65 @@ class ReadLog:
self.next_log_entry = None
return log_entry
def parse_event(self, msg):
"""Parse the event from log into key value pairs"""
msg = msg.strip()
self.debug_logger.info('parse_event: %s', msg)
event = LibAppArmor.parse_record(msg)
def get_event_type(self, e):
if e['operation'] == 'exec':
return 'exec'
elif e['class'] and e['class'] == 'namespace':
if e['denied_mask'] and e['denied_mask'].startswith('userns_'):
return 'userns'
elif not e['denied_mask'] and e['request_mask'].startswith('userns_'): # To support transition to special userns profiles
return 'userns'
elif e['class'] and e['class'].endswith('mqueue'):
return 'mqueue'
elif e['class'] and e['class'] == 'io_uring':
return 'io_uring'
elif e['class'] and e['class'] == 'mount' or e['operation'] == 'mount':
return 'mount'
elif e['operation'] and e['operation'] == 'pivotroot':
return 'pivot_root'
elif e['class'] and e['class'] == 'net' and e['family'] and e['family'] == 'unix':
return 'unix'
elif self.op_type(e) == 'file':
return 'path'
elif e['operation'] == 'capable':
return 'capability'
elif self.op_type(e) == 'net':
return 'network'
elif e['operation'] == 'change_hat':
return 'change_hat'
elif e['operation'] == 'change_profile':
return 'change_profile'
elif e['operation'] == 'ptrace':
return 'ptrace'
elif e['operation'] == 'signal':
return 'signal'
elif e['operation'] and e['operation'].startswith('dbus_'):
return 'dbus'
else:
self.debug_logger.debug('UNHANDLED: %s', e)
return None
def get_rule_type(self, r):
for k, v in self.ruletypes.items():
if v.match(r):
return k, v
return None
def create_rule_from_ev(self, ev):
ruletype = self.ruletypes[self.get_event_type(ev)]
try:
return ruletype.create_from_ev(ev)
except Exception:
return None
def parse_record(self, event):
"""Parse the record from LibAppArmor into key value pairs"""
ev = dict()
ev['resource'] = event.info
ev['active_hat'] = event.active_hat
@@ -114,44 +197,47 @@ class ReadLog:
ev['fsuid'] = event.fsuid
ev['ouid'] = event.ouid
if ev['operation'] and ev['operation'] == 'signal':
ev['signal'] = event.signal
ev['peer'] = event.peer
elif ev['operation'] and ev['operation'] == 'ptrace':
ev['peer'] = event.peer
elif ev['operation'] and ev['operation'] == 'pivotroot':
ev['src_name'] = event.src_name
elif ev['operation'] and ev['operation'] == 'mount':
ev['flags'] = event.flags
ev['fs_type'] = event.fs_type
ev['src_name'] = event.src_name
elif ev['operation'] and (ev['operation'] == 'umount'):
ev['flags'] = event.flags
ev['fs_type'] = event.fs_type
elif ev['class'] and ev['class'] == 'net' or self.op_type(ev) == 'net':
ev['accesses'] = event.requested_mask
ev['port'] = event.net_local_port or None
ev['remote_port'] = event.net_foreign_port or None
if ev['family'] and ev['family'] == 'unix':
match self.get_event_type(ev):
case 'signal':
ev['signal'] = event.signal
ev['peer'] = event.peer
case 'ptrace':
ev['peer'] = event.peer
case 'pivot_root':
ev['src_name'] = event.src_name
case 'mount':
ev['flags'] = event.flags
ev['fs_type'] = event.fs_type
if ev['operation'] and ev['operation'] == 'mount':
ev['src_name'] = event.src_name # mount can have a source but not umount.
case 'userns':
ev['execpath'] = event.execpath
ev['comm'] = event.comm
case 'network':
ev['accesses'] = event.requested_mask
ev['port'] = event.net_local_port or None
ev['remote_port'] = event.net_foreign_port or None
ev['addr'] = event.net_local_addr
ev['peer_addr'] = event.net_foreign_addr
ev['addr'] = event.net_local_addr
ev['peer_addr'] = event.net_foreign_addr
case 'unix':
ev['accesses'] = event.requested_mask
ev['port'] = event.net_local_port or None
ev['remote_port'] = event.net_foreign_port or None
ev['addr'] = event.net_addr
ev['peer_addr'] = event.peer_addr
ev['peer'] = event.peer
ev['peer_profile'] = event.peer_profile
else:
ev['addr'] = event.net_local_addr
ev['peer_addr'] = event.net_foreign_addr
case 'dbus':
ev['peer_profile'] = event.peer_profile
ev['bus'] = event.dbus_bus
ev['path'] = event.dbus_path
ev['interface'] = event.dbus_interface
ev['member'] = event.dbus_member
elif ev['operation'] and ev['operation'].startswith('dbus_'):
ev['peer_profile'] = event.peer_profile
ev['bus'] = event.dbus_bus
ev['path'] = event.dbus_path
ev['interface'] = event.dbus_interface
ev['member'] = event.dbus_member
elif ev['operation'] and ev['operation'].startswith('uring_'):
ev['peer_profile'] = event.peer_profile
LibAppArmor.free_record(event)
case 'io_uring':
ev['peer_profile'] = event.peer_profile
if not ev['time']:
ev['time'] = int(time.time())
@@ -181,6 +267,16 @@ class ReadLog:
else:
return None
def parse_event(self, msg):
"""Parse the event from log into key value pairs"""
msg = msg.strip()
self.debug_logger.info('parse_event: %s', msg)
event = LibAppArmor.parse_record(msg)
ev = self.parse_record(event)
LibAppArmor.free_record(event)
return ev
def parse_event_for_tree(self, e):
aamode = e.get('aamode', 'UNKNOWN')
@@ -215,85 +311,40 @@ class ReadLog:
self.hashlog[aamode][full_profile]['exec'][e['name']][e['name2']] = True
return
# TODO: replace all the if conditions with a loop over 'ruletypes'
elif e['class'] and e['class'] == 'namespace':
if e['denied_mask'].startswith('userns_'):
self.hashlog[aamode][full_profile]['userns'][e['denied_mask'][7:]] = True # [7:] removes the 'userns_' prefix
UserNamespaceRule.hashlog_from_event(self.hashlog[aamode][full_profile]['userns'], e)
return
elif e['class'] and e['class'].endswith('mqueue'):
mqueue_type = e['class'].partition('_')[0]
self.hashlog[aamode][full_profile]['mqueue'][e['denied_mask']][mqueue_type][e['name']] = True
MessageQueueRule.hashlog_from_event(self.hashlog[aamode][full_profile]['mqueue'], e)
return
elif e['class'] and e['class'] == 'io_uring':
self.hashlog[aamode][full_profile]['io_uring'][e['denied_mask']][e['peer_profile']] = True
IOUringRule.hashlog_from_event(self.hashlog[aamode][full_profile]['io_uring'], e)
return
elif e['class'] and e['class'] == 'mount' or e['operation'] == 'mount':
if e['flags'] is not None:
e['flags'] = ('=', e['flags'])
if e['fs_type'] is not None:
e['fs_type'] = ('=', e['fs_type'])
if e['operation'] == 'mount':
self.hashlog[aamode][full_profile]['mount'][e['operation']][e['flags']][e['fs_type']][e['name']][e['src_name']] = True
else: # Umount
self.hashlog[aamode][full_profile]['mount'][e['operation']][e['flags']][e['fs_type']][e['name']][None] = True
return
MountRule.hashlog_from_event(self.hashlog[aamode][full_profile]['mount'], e)
elif e['operation'] and e['operation'] == 'pivotroot':
# TODO: can the log contain the target profile?
self.hashlog[aamode][full_profile]['pivot_root'][e['src_name']][e['name']] = True
PivotRootRule.hashlog_from_event(self.hashlog[aamode][full_profile]['pivot_root'], e)
elif e['class'] and e['class'] == 'net' and e['family'] and e['family'] == 'unix':
rule = (e['sock_type'], None) # Protocol is not supported yet.
local = (e['addr'], None, e['attr'], None)
peer = (e['peer_addr'], e['peer_profile'])
self.hashlog[aamode][full_profile]['unix'][e['denied_mask']][rule][local][peer] = True
UnixRule.hashlog_from_event(self.hashlog[aamode][full_profile]['unix'], e)
return
elif self.op_type(e) == 'file':
# Map c (create) and d (delete) to w (logging is more detailed than the profile language)
dmask = e['denied_mask']
dmask = dmask.replace('c', 'w')
dmask = dmask.replace('d', 'w')
owner = False
if '::' in dmask:
# old log styles used :: to indicate if permissions are meant for owner or other
(owner_d, other_d) = dmask.split('::')
if owner_d and other_d:
raise AppArmorException('Found log event with both owner and other permissions. Please open a bugreport!')
if owner_d:
dmask = owner_d
owner = True
else:
dmask = other_d
if e.get('ouid') is not None and e['fsuid'] == e['ouid']:
# in current log style, owner permissions are indicated by a match of fsuid and ouid
owner = True
if 'x' in dmask and dmask != 'x':
dmask = dmask.replace('x', '') # if dmask contains x and another mode, drop x here - we should see a separate exec event
for perm in dmask:
if perm in 'mrwalk': # intentionally not allowing 'x' here
self.hashlog[aamode][full_profile]['path'][e['name']][owner][perm] = True
else:
raise AppArmorException(_('Log contains unknown mode %s') % dmask)
return
FileRule.hashlog_from_event(self.hashlog[aamode][full_profile]['path'], e)
elif e['operation'] == 'capable':
self.hashlog[aamode][full_profile]['capability'][e['name']] = True
CapabilityRule.hashlog_from_event(self.hashlog[aamode][full_profile]['capability'], e)
return
elif self.op_type(e) == 'net':
local = (e['addr'], e['port'])
peer = (e['peer_addr'], e['remote_port'])
self.hashlog[aamode][full_profile]['network'][e['accesses']][e['family']][e['sock_type']][e['protocol']][local][peer] = True
NetworkRule.hashlog_from_event(self.hashlog[aamode][full_profile]['network'], e)
return
elif e['operation'] == 'change_hat':
@@ -304,7 +355,7 @@ class ReadLog:
return
elif e['operation'] == 'change_profile':
self.hashlog[aamode][full_profile]['change_profile'][e['name2']] = True
ChangeProfileRule.hashlog_from_event(self.hashlog[aamode][full_profile]['change_profile'], e)
return
elif e['operation'] == 'ptrace':
@@ -315,15 +366,15 @@ class ReadLog:
self.debug_logger.debug('ignored garbage ptrace event with empty denied_mask')
return
self.hashlog[aamode][full_profile]['ptrace'][e['peer']][e['denied_mask']] = True
PtraceRule.hashlog_from_event(self.hashlog[aamode][full_profile]['ptrace'], e)
return
elif e['operation'] == 'signal':
self.hashlog[aamode][full_profile]['signal'][e['peer']][e['denied_mask']][e['signal']] = True
SignalRule.hashlog_from_event(self.hashlog[aamode][full_profile]['signal'], e)
return
elif e['operation'] and e['operation'].startswith('dbus_'):
self.hashlog[aamode][full_profile]['dbus'][e['denied_mask']][e['bus']][e['path']][e['name']][e['interface']][e['member']][e['peer_profile']] = True
DbusRule.hashlog_from_event(self.hashlog[aamode][full_profile]['dbus'], e)
return
else:

View File

@@ -16,7 +16,7 @@
from abc import ABCMeta, abstractmethod
from apparmor.aare import AARE
from apparmor.common import AppArmorBug, AppArmorException
from apparmor.common import AppArmorBug, AppArmorException, hasher
from apparmor.regex import strip_quotes
from apparmor.translations import init_translation
@@ -116,6 +116,38 @@ class BaseRule(metaclass=ABCMeta):
required to be implemented by subclasses; raise exception if not"""
raise NotImplementedError("'%s' needs to implement _create_instance(), but didn't" % (str(cls)))
@staticmethod
def generate_rules_from_hashlog(hashlog, nb_keys):
"""yields all key sequences from a hashlog of depth nb_keys"""
stack = [(hashlog, [], nb_keys)]
while stack:
items, path, depth = stack.pop()
if depth == 0:
yield path
continue
for next_key in items:
stack.append((items[next_key], path + [next_key], depth - 1))
@classmethod
def create_from_ev(cls, ev):
"""returns a rule that would allow an event"""
hl = hasher()
cls.hashlog_from_event(hl, ev)
return next(cls.from_hashlog(hl))
@staticmethod
def hashlog_from_event(hl, ev):
"""stores an event in the hashlog"""
raise NotImplementedError('hashlog_from_event should be called on a rule class and not directly on BaseRule.')
@classmethod
def from_hashlog(cls, hl):
"""constructs and yields all rules that would allow denials stored in a hashlog"""
raise NotImplementedError("'%s' needs to implement from_hashlog(), but didn't" % (str(cls)))
@abstractmethod
def get_clean(self, depth=0):
"""return clean rule (with default formatting, and leading whitespace as specified in the depth parameter)"""

View File

@@ -141,6 +141,15 @@ class CapabilityRule(BaseRule):
return _('Capability'), cap_txt
@staticmethod
def hashlog_from_event(hl, e):
hl[e['name']] = True
@classmethod
def from_hashlog(cls, hl):
for cap in hl.keys():
yield cls(cap, log_event=True)
class CapabilityRuleset(BaseRuleset):
"""Class to handle and store a collection of capability rules"""

View File

@@ -174,6 +174,15 @@ class ChangeProfileRule(BaseRule):
))
return headers
@staticmethod
def hashlog_from_event(hl, e):
hl[e['name2']] = True
@classmethod
def from_hashlog(cls, hl):
for cp in hl.keys():
yield cls(None, cls.ALL, cp, log_event=True)
class ChangeProfileRuleset(BaseRuleset):
"""Class to handle and store a collection of change_profile rules"""

View File

@@ -308,6 +308,25 @@ class DbusRule(BaseRule):
_('Peer label'), peerlabel,
)
@staticmethod
def hashlog_from_event(hl, e):
hl[e['denied_mask']][e['bus']][e['path']][e['name']][e['interface']][e['member']][e['peer_profile']] = True
@classmethod
def from_hashlog(cls, hl):
for access, bus, path, name, interface, member, peer_profile in BaseRule.generate_rules_from_hashlog(hl, 7):
# Depending on the access type, not all parameters are allowed.
# Ignore them, even if some of them appear in the log.
# Also, the log doesn't provide a peer name, therefore always use ALL.
if access in ('send', 'receive'):
yield cls(access, bus, path, cls.ALL, interface, member, cls.ALL, peer_profile, log_event=True)
elif access == 'bind':
yield cls(access, bus, cls.ALL, name, cls.ALL, cls.ALL, cls.ALL, cls.ALL, log_event=True)
elif access == 'eavesdrop':
yield cls(access, bus, cls.ALL, cls.ALL, cls.ALL, cls.ALL, cls.ALL, cls.ALL, log_event=True)
else:
raise AppArmorBug('unexpected dbus access: {}'.format(access))
class DbusRuleset(BaseRuleset):
"""Class to handle and store a collection of dbus rules"""

View File

@@ -425,6 +425,50 @@ class FileRule(BaseRule):
self.path = AARE(newpath, True) # might raise AppArmorException if the new path doesn't start with / or a variable
self.raw_rule = None
@staticmethod
def hashlog_from_event(hl, e):
# Map c (create) and d (delete) to w (logging is more detailed than the profile language)
dmask = e['denied_mask']
dmask = dmask.replace('c', 'w')
dmask = dmask.replace('d', 'w')
owner = False
if '::' in dmask:
# old log styles used :: to indicate if permissions are meant for owner or other
(owner_d, other_d) = dmask.split('::')
if owner_d and other_d:
raise AppArmorException(
'Found log event with both owner and other permissions. Please open a bugreport!')
if owner_d:
dmask = owner_d
owner = True
else:
dmask = other_d
if e.get('ouid') is not None and e['fsuid'] == e['ouid']:
# in current log style, owner permissions are indicated by a match of fsuid and ouid
owner = True
if 'x' in dmask and dmask != 'x':
dmask = dmask.replace('x', '') # if dmask contains x and another mode, drop x here - we should see a separate exec event
for perm in dmask:
if perm in 'mrwalk': # intentionally not allowing 'x' here
hl[e['name']][owner][perm] = True
else:
raise AppArmorException(_('Log contains unknown mode %s') % dmask)
@classmethod
def from_hashlog(cls, hl):
for path, owner in BaseRule.generate_rules_from_hashlog(hl, 2):
mode = set(hl[path][owner].keys())
# logparser sums up multiple log events, so both 'a' and 'w' can be present
if 'a' in mode and 'w' in mode:
mode.remove('a')
yield cls(path, mode, None, FileRule.ALL, owner=owner, log_event=True)
# TODO: check for existing rules with this path, and merge them into one rule
class FileRuleset(BaseRuleset):
"""Class to handle and store a collection of file rules"""

View File

@@ -153,6 +153,17 @@ class IOUringRule(BaseRule):
_('Label'), label,
)
@staticmethod
def hashlog_from_event(hl, e):
hl[e['denied_mask']][e['peer_profile']] = True
@classmethod
def from_hashlog(cls, hl):
for access, label in BaseRule.generate_rules_from_hashlog(hl, 2):
if not label:
label = IOUringRule.ALL
yield cls(access, label, log_event=True)
class IOUringRuleset(BaseRuleset):
'''Class to handle and store a collection of io_uring rules'''

View File

@@ -261,6 +261,26 @@ class MountRule(BaseRule):
return True
@staticmethod
def hashlog_from_event(hl, e):
if e['flags'] is not None:
e['flags'] = ('=', e['flags'])
if e['fs_type'] is not None:
e['fs_type'] = ('=', e['fs_type'])
if e['operation'] == 'mount':
hl[e['operation']][e['flags']][e['fs_type']][e['name']][e['src_name']] = True
else: # Umount
hl[e['operation']][e['flags']][e['fs_type']][e['name']][None] = True
@classmethod
def from_hashlog(cls, hl):
for operation, options, fstype, dest, source in cls.generate_rules_from_hashlog(hl, 5):
_options = (options[0], options[1].split(', ')) if options is not None else MountRule.ALL
_fstype = (fstype[0], fstype[1].split(', ')) if fstype is not None else MountRule.ALL
_source = source if source is not None else MountRule.ALL
_dest = dest if dest is not None else MountRule.ALL
yield cls(operation=operation, fstype=_fstype, options=_options, source=_source, dest=_dest)
def glob(self):
'''Change path to next possible glob'''
if self.all_source and self.all_options:

View File

@@ -219,6 +219,16 @@ class MessageQueueRule(BaseRule):
_('Message queue name'), mqueue_name
)
@staticmethod
def hashlog_from_event(hl, e):
mqueue_type = e['class'].partition('_')[0]
hl[e['denied_mask']][mqueue_type][e['name']] = True
@classmethod
def from_hashlog(cls, hl):
for access, mqueue_type, mqueue_name in BaseRule.generate_rules_from_hashlog(hl, 3):
yield cls(access, mqueue_type, MessageQueueRule.ALL, mqueue_name, log_event=True)
class MessageQueueRuleset(BaseRuleset):
'''Class to handle and store a collection of mqueue rules'''

View File

@@ -291,6 +291,17 @@ class NetworkRule(BaseRule):
_('Peer'), peer_expr,
)
@staticmethod
def hashlog_from_event(hl, e):
local = (e['addr'], e['port'])
peer = (e['peer_addr'], e['remote_port'])
hl[e['accesses']][e['family']][e['sock_type']][e['protocol']][local][peer] = True
@classmethod
def from_hashlog(cls, hl):
for access, family, sock_type, protocol, local_event, peer_event in BaseRule.generate_rules_from_hashlog(hl, 6):
yield cls(access, family, sock_type, local_event, peer_event, log_event=True)
class NetworkRuleset(BaseRuleset):
"""Class to handle and store a collection of network rules"""

View File

@@ -193,6 +193,16 @@ class PivotRootRule(BaseRule):
_('Target profile'), profile_name,
)
@staticmethod
def hashlog_from_event(hl, e):
# TODO: can the log contain the target profile?
hl[e['src_name']][e['name']] = True
@classmethod
def from_hashlog(cls, hl):
for oldroot, newroot in BaseRule.generate_rules_from_hashlog(hl, 2):
yield cls(oldroot, newroot, cls.ALL, log_event=True)
class PivotRootRuleset(BaseRuleset):
'''Class to handle and store a collection of pivot_root rules'''

View File

@@ -156,6 +156,19 @@ class PtraceRule(BaseRule):
_('Peer'), peer,
)
@staticmethod
def hashlog_from_event(hl, e):
hl[e['peer']][e['denied_mask']] = True
@classmethod
def from_hashlog(cls, hl):
for peer in hl.keys():
if '//null-' in peer:
continue # ignore null-* peers
for access in hl[peer].keys():
yield cls(access, peer, log_event=True)
class PtraceRuleset(BaseRuleset):
"""Class to handle and store a collection of ptrace rules"""

View File

@@ -217,6 +217,19 @@ class SignalRule(BaseRule):
_('Peer'), peer,
)
@staticmethod
def hashlog_from_event(hl, e):
hl[e['peer']][e['denied_mask']][e['signal']] = True
@classmethod
def from_hashlog(cls, hl):
for peer in hl.keys():
if '//null-' in peer:
continue # ignore null-* peers
for access, signal in BaseRule.generate_rules_from_hashlog(hl[peer], 2):
yield cls(access, signal, peer, log_event=True)
class SignalRuleset(BaseRuleset):
"""Class to handle and store a collection of signal rules"""

View File

@@ -158,6 +158,19 @@ class UnixRule(BaseRule):
return True
@staticmethod
def hashlog_from_event(hl, e):
rule = (e['sock_type'], None) # Protocol is not supported yet.
local = (e['addr'], None, e['attr'], None)
peer = (e['peer_addr'], e['peer_profile'])
hl[e['denied_mask']][rule][local][peer] = True
@classmethod
def from_hashlog(cls, hl):
for denied_mask, rule, local, peer in BaseRule.generate_rules_from_hashlog(hl, 4):
yield cls(denied_mask, rule, local, peer)
def glob(self):
'''Change path to next possible glob'''
if self.peer_expr != self.ALL:

View File

@@ -77,6 +77,18 @@ class UserNamespaceRule(BaseRule):
return cls(access, audit=audit, deny=deny,
allow_keyword=allow_keyword, comment=comment)
@staticmethod
def hashlog_from_event(hl, e):
if e['denied_mask']:
hl[e['denied_mask'][7:]] = True # [7:] removes the 'userns_' prefix
else:
hl[e['request_mask'][7:]] = True # To support transition to special profiles
@classmethod
def from_hashlog(cls, hl):
for access in BaseRule.generate_rules_from_hashlog(hl, 1):
yield cls(access)
def get_clean(self, depth=0):
'''return rule (in clean/default formatting)'''

View File

@@ -13,7 +13,7 @@ import re
import unittest
import apparmor.severity as severity
from apparmor.common import AppArmorBug
from apparmor.common import AppArmorBug, hasher
from apparmor.rule import BaseRule, parse_modifiers
from common_test import AATest, setup_all_loops
@@ -98,6 +98,15 @@ class TestBaserule(AATest):
with self.assertRaises(NotImplementedError):
obj.store_edit('/foo')
def test_from_hashlog(self):
obj = self.ValidSubclass()
with self.assertRaises(NotImplementedError):
obj.from_hashlog(hasher())
def test_hashlog_from_event(self):
with self.assertRaises(NotImplementedError):
BaseRule.hashlog_from_event(None, None)
setup_all_loops(__name__)
if __name__ == '__main__':

View File

@@ -376,6 +376,13 @@ class InvalidDbusTest(AATest):
with self.assertRaises(AppArmorBug):
obj.get_clean(1)
def test_invalid_event_access(self):
parser = ReadLog('', '', '')
event = 'type=USER_AVC msg=audit(1375323372.644:157): pid=363 uid=102 auid=4294967295 ses=4294967295 msg=\'apparmor="DENIED" operation="dbus_method_call" bus="system" name="org.freedesktop.DBus" path="/org/freedesktop/DBus" interface="org.freedesktop.DBus" member="Hello" mask="invalid_access" pid=2833 profile="/tmp/apparmor-2.8.0/tests/regression/apparmor/dbus_service" peer_profile="unconfined" exe="/bin/dbus-daemon" sauid=102 hostname=? addr=? terminal=?\''
ev = parser.parse_event(event)
with self.assertRaises(AppArmorBug):
DbusRule.create_from_ev(ev)
class WriteDbusTest(AATest):
def _run_test(self, rawrule, expected):

View File

@@ -17,7 +17,7 @@ import unittest
from collections import namedtuple
import apparmor.severity as severity
from apparmor.common import AppArmorBug, AppArmorException
from apparmor.common import AppArmorBug, AppArmorException, hasher
from apparmor.logparser import ReadLog
from apparmor.rule.file import FileRule, FileRuleset
from apparmor.translations import init_translation
@@ -1219,6 +1219,30 @@ class FileGetExecConflictRules_1(AATest):
self. assertEqual(conflicts.get_clean(), expected)
class FileModeTest(AATest):
def test_write_append(self):
parser = ReadLog('', '', '')
events = [
'[ 9614.885136] audit: type=1400 audit(1720429924.397:191): apparmor="DENIED" operation="open" class="file" profile="/home/user/test/a" name="/home/user/test/foo" pid=24460 comm="a" requested_mask="w" denied_mask="w" fsuid=1000 ouid=1000',
'[ 9614.885149] audit: type=1400 audit(1720429924.397:192): apparmor="DENIED" operation="open" class="file" profile="/home/user/test/a" name="/home/user/test/foo" pid=24460 comm="a" requested_mask="a" denied_mask="a" fsuid=1000 ouid=1000'
]
hl = hasher()
for raw_ev in events:
ev = parser.parse_event(raw_ev)
FileRule.hashlog_from_event(hl, ev)
expected = {'/home/user/test/foo': {True: {'w': True, 'a': True}}}
self.assertEqual(hl, expected)
fr = FileRule.from_hashlog(hl)
expected = FileRule('/home/user/test/foo', 'w', None, FileRule.ALL, True)
self.assertTrue(expected.is_equal(next(fr)))
with self.assertRaises(StopIteration):
next(fr)
setup_all_loops(__name__)
if __name__ == '__main__':
unittest.main(verbosity=1)

View File

@@ -100,6 +100,20 @@ class TestParseEvent(AATest):
self.assertIsNotNone(ReadLog.RE_LOG_ALL.search(event))
def test_get_rule_type(self):
rules = [
('mount fstype=bpf options=(rw) random_label -> /sys/fs/bpf/,', 'mount'),
('unix send addr=@foo{a,b} peer=(label=splat),', 'unix'),
('userns create, # cmt', 'userns'),
('allow /tmp/foo ra,', 'file'),
('file rwix /foo,', 'file'),
('signal set=quit peer=unconfined,', 'signal')
]
for r, exp in rules:
self.assertEqual(self.parser.get_rule_type(r)[0], exp)
self.assertEqual(self.parser.get_rule_type('invalid rule,'), None)
class TestParseEventForTreeInvalid(AATest):
tests = (

View File

@@ -16,7 +16,7 @@
import unittest
from collections import namedtuple
from apparmor.common import AppArmorBug, AppArmorException
from apparmor.common import AppArmorBug, AppArmorException, hasher
from apparmor.logparser import ReadLog
from apparmor.rule.ptrace import PtraceRule, PtraceRuleset
from apparmor.translations import init_translation
@@ -131,6 +131,23 @@ class PtraceTestParseFromLog(PtraceTest):
obj.get_raw(1),
' ptrace tracedby peer=/home/ubuntu/bzr/apparmor/tests/regression/apparmor/ptrace,')
def test_null_ptrace_from(self):
log = 'type=AVC msg=audit(1495217772.047:4471): apparmor="DENIED" operation="ptrace" profile="/usr/bin/pidgin" pid=21704 comm="pidgin" peer="//null-"'
parser = ReadLog('', '', '')
hl = hasher()
ev = parser.parse_event(log)
PtraceRule.hashlog_from_event(hl, ev)
expected = {'//null-': {None: True}}
self.assertEqual(hl, expected)
sr = PtraceRule.from_hashlog(hl)
with self.assertRaises(StopIteration):
next(sr)
class PtraceFromInit(PtraceTest):
tests = (

View File

@@ -16,7 +16,7 @@
import unittest
from collections import namedtuple
from apparmor.common import AppArmorBug, AppArmorException
from apparmor.common import AppArmorBug, AppArmorException, hasher
from apparmor.logparser import ReadLog
from apparmor.rule.signal import SignalRule, SignalRuleset
from apparmor.translations import init_translation
@@ -134,6 +134,24 @@ class SignalTestParseFromLog(SignalTest):
self.assertEqual(obj.get_raw(1), ' signal send set=term peer=/usr/bin/pulseaudio///usr/lib/pulseaudio/pulse/gconf-helper,')
def test_null_signal_from_log(self):
log = 'type=AVC msg=audit(1409438250.564:201): apparmor="DENIED" operation="signal" profile="/usr/bin/pulseaudio" pid=2531 comm="pulseaudio" requested_mask="send" denied_mask="send" signal=term peer="//null-"'
parser = ReadLog('', '', '')
hl = hasher()
ev = parser.parse_event(log)
SignalRule.hashlog_from_event(hl, ev)
expected = {'//null-': {'send': {'term': True}}}
self.assertEqual(hl, expected)
sr = SignalRule.from_hashlog(hl)
with self.assertRaises(StopIteration):
next(sr)
class SignalFromInit(SignalTest):
tests = (

View File

@@ -15,10 +15,13 @@
import unittest
from collections import namedtuple
from apparmor.logparser import ReadLog
from common_test import AATest, setup_all_loops
from apparmor.rule.userns import UserNamespaceRule, UserNamespaceRuleset
from apparmor.common import AppArmorException, AppArmorBug
from apparmor.common import AppArmorException, AppArmorBug, hasher
from apparmor.translations import init_translation
_ = init_translation()
@@ -142,6 +145,27 @@ class UserNamespaceLogprofHeaderTest(AATest):
obj = UserNamespaceRule.create_instance(params)
self.assertEqual(obj.logprof_header(), expected)
def test_unconfined_usens_from_log(self):
log = 'type=AVC msg=audit(1720613712.153:168): apparmor="AUDIT" operation="userns_create" class="namespace" info="Userns create - transitioning profile" profile="unconfined" pid=5630 comm="unshare" requested="userns_create" target="unprivileged_userns" execpath="/usr/bin/unshare"'
parser = ReadLog('', '', '')
hl = hasher()
ev = parser.parse_event(log)
UserNamespaceRule.hashlog_from_event(hl, ev)
expected = {'create': True}
self.assertEqual(hl, expected)
ur = UserNamespaceRule.from_hashlog(hl)
expected = UserNamespaceRule('create')
self.assertTrue(expected.is_equal(next(ur)))
with self.assertRaises(StopIteration):
next(ur)
class UserNamespaceGlobTestAATest(AATest):
def test_glob(self):