mirror of
https://gitlab.com/apparmor/apparmor
synced 2025-09-01 14:55:10 +00:00
utils: Basic support for signal rules
Bug: https://bugs.launchpad.net/bugs/1300316 This patch does bare bones parsing of signal rules and stores the raw strings for writing them out later. It is meant to be a simple change to prevent aa.py from emitting a traceback when encountering signal rules. Signed-off-by: Tyler Hicks <tyhicks@canonical.com> Acked-by: Steve Beattie <steve@nxnw.org> Acked-By: Christian Boltz <apparmor@cboltz.de>
This commit is contained in:
@@ -2624,6 +2624,7 @@ RE_NETWORK_FAMILY_TYPE = re.compile('\s+(\S+)\s+(\S+)\s*,$')
|
|||||||
RE_NETWORK_FAMILY = re.compile('\s+(\S+)\s*,$')
|
RE_NETWORK_FAMILY = re.compile('\s+(\S+)\s*,$')
|
||||||
RE_PROFILE_DBUS = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?(dbus[^#]*\s*,)\s*(#.*)?$')
|
RE_PROFILE_DBUS = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?(dbus[^#]*\s*,)\s*(#.*)?$')
|
||||||
RE_PROFILE_MOUNT = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?((mount|remount|umount)[^#]*\s*,)\s*(#.*)?$')
|
RE_PROFILE_MOUNT = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?((mount|remount|umount)[^#]*\s*,)\s*(#.*)?$')
|
||||||
|
RE_PROFILE_SIGNAL = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?((signal)[^#]*\s*,)\s*(#.*)?$')
|
||||||
|
|
||||||
# match anything that's not " or #, or matching quotes with anything except quotes inside
|
# match anything that's not " or #, or matching quotes with anything except quotes inside
|
||||||
__re_no_or_quoted_hash = '([^#"]|"[^"]*")*'
|
__re_no_or_quoted_hash = '([^#"]|"[^"]*")*'
|
||||||
@@ -2702,6 +2703,7 @@ def parse_profile_data(data, file, do_include):
|
|||||||
profile_data[profile][hat]['allow']['path'] = hasher()
|
profile_data[profile][hat]['allow']['path'] = hasher()
|
||||||
profile_data[profile][hat]['allow']['dbus'] = list()
|
profile_data[profile][hat]['allow']['dbus'] = list()
|
||||||
profile_data[profile][hat]['allow']['mount'] = list()
|
profile_data[profile][hat]['allow']['mount'] = list()
|
||||||
|
profile_data[profile][hat]['allow']['signal'] = list()
|
||||||
# Save the initial comment
|
# Save the initial comment
|
||||||
if initial_comment:
|
if initial_comment:
|
||||||
profile_data[profile][hat]['initial_comment'] = initial_comment
|
profile_data[profile][hat]['initial_comment'] = initial_comment
|
||||||
@@ -3062,6 +3064,28 @@ def parse_profile_data(data, file, do_include):
|
|||||||
mount_rules.append(mount_rule)
|
mount_rules.append(mount_rule)
|
||||||
profile_data[profile][hat][allow]['mount'] = mount_rules
|
profile_data[profile][hat][allow]['mount'] = mount_rules
|
||||||
|
|
||||||
|
elif RE_PROFILE_SIGNAL.search(line):
|
||||||
|
matches = RE_PROFILE_SIGNAL.search(line).groups()
|
||||||
|
|
||||||
|
if not profile:
|
||||||
|
raise AppArmorException(_('Syntax Error: Unexpected signal entry found in file: %s line: %s') % (file, lineno + 1))
|
||||||
|
|
||||||
|
audit = False
|
||||||
|
if matches[0]:
|
||||||
|
audit = True
|
||||||
|
allow = 'allow'
|
||||||
|
if matches[1] and matches[1].strip() == 'deny':
|
||||||
|
allow = 'deny'
|
||||||
|
signal = matches[2].strip()
|
||||||
|
|
||||||
|
signal_rule = parse_signal_rule(signal)
|
||||||
|
signal_rule.audit = audit
|
||||||
|
signal_rule.deny = (allow == 'deny')
|
||||||
|
|
||||||
|
signal_rules = profile_data[profile][hat][allow].get('signal', list())
|
||||||
|
signal_rules.append(signal_rule)
|
||||||
|
profile_data[profile][hat][allow]['signal'] = signal_rules
|
||||||
|
|
||||||
elif RE_PROFILE_CHANGE_HAT.search(line):
|
elif RE_PROFILE_CHANGE_HAT.search(line):
|
||||||
matches = RE_PROFILE_CHANGE_HAT.search(line).groups()
|
matches = RE_PROFILE_CHANGE_HAT.search(line).groups()
|
||||||
|
|
||||||
@@ -3160,6 +3184,10 @@ def parse_mount_rule(line):
|
|||||||
# XXX Do real parsing here
|
# XXX Do real parsing here
|
||||||
return aarules.Raw_Mount_Rule(line)
|
return aarules.Raw_Mount_Rule(line)
|
||||||
|
|
||||||
|
def parse_signal_rule(line):
|
||||||
|
# XXX Do real parsing here
|
||||||
|
return aarules.Raw_Signal_Rule(line)
|
||||||
|
|
||||||
def separate_vars(vs):
|
def separate_vars(vs):
|
||||||
"""Returns a list of all the values for a variable"""
|
"""Returns a list of all the values for a variable"""
|
||||||
data = []
|
data = []
|
||||||
@@ -3389,6 +3417,24 @@ def write_mount(prof_data, depth):
|
|||||||
data += write_mount_rules(prof_data, depth, 'allow')
|
data += write_mount_rules(prof_data, depth, 'allow')
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def write_signal_rules(prof_data, depth, allow):
|
||||||
|
pre = ' ' * depth
|
||||||
|
data = []
|
||||||
|
|
||||||
|
# no signal rules, so return
|
||||||
|
if not prof_data[allow].get('signal', False):
|
||||||
|
return data
|
||||||
|
|
||||||
|
for signal_rule in prof_data[allow]['signal']:
|
||||||
|
data.append('%s%s' % (pre, signal_rule.serialize()))
|
||||||
|
data.append('')
|
||||||
|
return data
|
||||||
|
|
||||||
|
def write_signal(prof_data, depth):
|
||||||
|
data = write_signal_rules(prof_data, depth, 'deny')
|
||||||
|
data += write_signal_rules(prof_data, depth, 'allow')
|
||||||
|
return data
|
||||||
|
|
||||||
def write_link_rules(prof_data, depth, allow):
|
def write_link_rules(prof_data, depth, allow):
|
||||||
pre = ' ' * depth
|
pre = ' ' * depth
|
||||||
data = []
|
data = []
|
||||||
@@ -3495,6 +3541,7 @@ def write_rules(prof_data, depth):
|
|||||||
data += write_netdomain(prof_data, depth)
|
data += write_netdomain(prof_data, depth)
|
||||||
data += write_dbus(prof_data, depth)
|
data += write_dbus(prof_data, depth)
|
||||||
data += write_mount(prof_data, depth)
|
data += write_mount(prof_data, depth)
|
||||||
|
data += write_signal(prof_data, depth)
|
||||||
data += write_links(prof_data, depth)
|
data += write_links(prof_data, depth)
|
||||||
data += write_paths(prof_data, depth)
|
data += write_paths(prof_data, depth)
|
||||||
data += write_change_profile(prof_data, depth)
|
data += write_change_profile(prof_data, depth)
|
||||||
@@ -3644,6 +3691,7 @@ def serialize_profile_from_old_profile(profile_data, name, options):
|
|||||||
'netdomain': write_netdomain,
|
'netdomain': write_netdomain,
|
||||||
'dbus': write_dbus,
|
'dbus': write_dbus,
|
||||||
'mount': write_mount,
|
'mount': write_mount,
|
||||||
|
'signal': write_signal,
|
||||||
'link': write_links,
|
'link': write_links,
|
||||||
'path': write_paths,
|
'path': write_paths,
|
||||||
'change_profile': write_change_profile,
|
'change_profile': write_change_profile,
|
||||||
@@ -3736,6 +3784,7 @@ def serialize_profile_from_old_profile(profile_data, name, options):
|
|||||||
data += write_netdomain(write_prof_data[name], depth)
|
data += write_netdomain(write_prof_data[name], depth)
|
||||||
data += write_dbus(write_prof_data[name], depth)
|
data += write_dbus(write_prof_data[name], depth)
|
||||||
data += write_mount(write_prof_data[name], depth)
|
data += write_mount(write_prof_data[name], depth)
|
||||||
|
data += write_signal(write_prof_data[name], depth)
|
||||||
data += write_links(write_prof_data[name], depth)
|
data += write_links(write_prof_data[name], depth)
|
||||||
data += write_paths(write_prof_data[name], depth)
|
data += write_paths(write_prof_data[name], depth)
|
||||||
data += write_change_profile(write_prof_data[name], depth)
|
data += write_change_profile(write_prof_data[name], depth)
|
||||||
|
@@ -67,3 +67,15 @@ class Raw_Mount_Rule(object):
|
|||||||
return "%s%s%s" % ('audit ' if self.audit else '',
|
return "%s%s%s" % ('audit ' if self.audit else '',
|
||||||
'deny ' if self.deny else '',
|
'deny ' if self.deny else '',
|
||||||
self.rule)
|
self.rule)
|
||||||
|
|
||||||
|
class Raw_Signal_Rule(object):
|
||||||
|
audit = False
|
||||||
|
deny = False
|
||||||
|
|
||||||
|
def __init__(self, rule):
|
||||||
|
self.rule = rule
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
return "%s%s%s" % ('audit ' if self.audit else '',
|
||||||
|
'deny ' if self.deny else '',
|
||||||
|
self.rule)
|
||||||
|
@@ -41,7 +41,12 @@ regex_has_comma_testcases = [
|
|||||||
('audit "/tmp/foo, # bar" rw%s # comment', 'comment embedded in quote 02'),
|
('audit "/tmp/foo, # bar" rw%s # comment', 'comment embedded in quote 02'),
|
||||||
|
|
||||||
# lifted from parser/tst/simple_tests/vars/vars_alternation_3.sd
|
# lifted from parser/tst/simple_tests/vars/vars_alternation_3.sd
|
||||||
('/does/not/@{BAR},exist,notexist} r%s', 'partial alternation')
|
('/does/not/@{BAR},exist,notexist} r%s', 'partial alternation'),
|
||||||
|
|
||||||
|
('signal%s', 'bare signal'),
|
||||||
|
('signal receive%s', 'simple signal'),
|
||||||
|
('signal (send, receive)%s', 'embedded parens signal 01'),
|
||||||
|
('signal (send, receive) set=(hup, quit)%s', 'embedded parens signal 02'),
|
||||||
|
|
||||||
# the following fail due to inadequacies in the regex
|
# the following fail due to inadequacies in the regex
|
||||||
# ('dbus (r, w, %s', 'incomplete dbus action'),
|
# ('dbus (r, w, %s', 'incomplete dbus action'),
|
||||||
@@ -99,6 +104,8 @@ regex_split_comment_testcases = [
|
|||||||
('file,', False),
|
('file,', False),
|
||||||
('file, # bare', ('file, ', '# bare')),
|
('file, # bare', ('file, ', '# bare')),
|
||||||
('file /tmp/foo rw, # read-write', ('file /tmp/foo rw, ', '# read-write')),
|
('file /tmp/foo rw, # read-write', ('file /tmp/foo rw, ', '# read-write')),
|
||||||
|
('signal, # comment', ('signal, ', '# comment')),
|
||||||
|
('signal receive set=(usr1 usr2) peer=foo,', False),
|
||||||
]
|
]
|
||||||
|
|
||||||
def setup_split_comment_testcases():
|
def setup_split_comment_testcases():
|
||||||
@@ -276,6 +283,88 @@ class AARegexFile(unittest.TestCase):
|
|||||||
result = aa.RE_PROFILE_FILE_ENTRY.search(line)
|
result = aa.RE_PROFILE_FILE_ENTRY.search(line)
|
||||||
self.assertFalse(result, 'RE_PROFILE_FILE_ENTRY unexpectedly matched "%s"' % line)
|
self.assertFalse(result, 'RE_PROFILE_FILE_ENTRY unexpectedly matched "%s"' % line)
|
||||||
|
|
||||||
|
class AARegexSignal(unittest.TestCase):
|
||||||
|
'''Tests for RE_PROFILE_SIGNAL'''
|
||||||
|
|
||||||
|
def test_bare_signal_01(self):
|
||||||
|
'''test ' signal,' '''
|
||||||
|
|
||||||
|
rule = 'signal,'
|
||||||
|
line = ' %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
|
def test_bare_signal_02(self):
|
||||||
|
'''test ' audit signal,' '''
|
||||||
|
|
||||||
|
rule = 'signal,'
|
||||||
|
line = ' audit %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
self.assertTrue(result.groups()[0], 'Couldn\'t find audit modifier in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
|
def test_simple_signal_01(self):
|
||||||
|
'''test ' signal receive,' '''
|
||||||
|
|
||||||
|
rule = 'signal receive,'
|
||||||
|
line = ' %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
|
def test_simple_signal_02(self):
|
||||||
|
'''test ' signal (send, receive),' '''
|
||||||
|
|
||||||
|
rule = 'signal (send, receive),'
|
||||||
|
line = ' %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
|
def test_simple_signal_03(self):
|
||||||
|
'''test ' audit signal (receive),' '''
|
||||||
|
|
||||||
|
rule = 'signal (receive),'
|
||||||
|
line = ' audit %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
self.assertTrue(result.groups()[0], 'Couldn\'t find audit modifier in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
|
def test_set_signal_01(self):
|
||||||
|
'''test ' signal (send, receive) set=(usr1 usr2),' '''
|
||||||
|
|
||||||
|
rule = 'signal (send, receive) set=(usr1 usr2),'
|
||||||
|
line = ' %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
|
def test_peer_signal_01(self):
|
||||||
|
'''test ' signal send set=(hup, quit) peer=/usr/sbin/daemon,' '''
|
||||||
|
|
||||||
|
rule = 'signal send set=(hup, quit) peer=/usr/sbin/daemon,'
|
||||||
|
line = ' %s' % rule
|
||||||
|
result = aa.RE_PROFILE_SIGNAL.search(line)
|
||||||
|
self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
|
||||||
|
parsed = result.groups()[2].strip()
|
||||||
|
self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
|
||||||
|
% (rule, parsed))
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
verbosity = 2
|
verbosity = 2
|
||||||
|
|
||||||
@@ -288,6 +377,7 @@ if __name__ == '__main__':
|
|||||||
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexCapability))
|
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexCapability))
|
||||||
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexPath))
|
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexPath))
|
||||||
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexFile))
|
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexFile))
|
||||||
|
test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexSignal))
|
||||||
result = unittest.TextTestRunner(verbosity=verbosity).run(test_suite)
|
result = unittest.TextTestRunner(verbosity=verbosity).run(test_suite)
|
||||||
if not result.wasSuccessful():
|
if not result.wasSuccessful():
|
||||||
exit(1)
|
exit(1)
|
||||||
|
62
utils/test/test-signal_parse.py
Normal file
62
utils/test/test-signal_parse.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
#! /usr/bin/env python
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# Copyright (C) 2014 Canonical Ltd.
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or
|
||||||
|
# modify it under the terms of version 2 of the GNU General Public
|
||||||
|
# License published by the Free Software Foundation.
|
||||||
|
#
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
import apparmor.aa as aa
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
class AAParseSignalTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def _test_parse_signal_rule(self, rule):
|
||||||
|
signal = aa.parse_signal_rule(rule)
|
||||||
|
self.assertEqual(rule, signal.serialize(),
|
||||||
|
'signal object returned "%s", expected "%s"' % (signal.serialize(), rule))
|
||||||
|
|
||||||
|
def test_parse_plain_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal,')
|
||||||
|
|
||||||
|
def test_parse_receive_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal (receive),')
|
||||||
|
|
||||||
|
def test_parse_send_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal (send),')
|
||||||
|
|
||||||
|
def test_parse_send_receive_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal (send receive),')
|
||||||
|
|
||||||
|
def test_parse_r_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal r,')
|
||||||
|
|
||||||
|
def test_parse_w_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal w,')
|
||||||
|
|
||||||
|
def test_parse_rw_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal rw,')
|
||||||
|
|
||||||
|
def test_parse_set_1_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal send set=("hup"),')
|
||||||
|
|
||||||
|
def test_parse_set_2_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal (receive) set=kill,')
|
||||||
|
|
||||||
|
def test_parse_set_3_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal w set=(quit int),')
|
||||||
|
|
||||||
|
def test_parse_peer_1_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal receive peer=foo,')
|
||||||
|
|
||||||
|
def test_parse_peer_2_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal (send receive) peer=/usr/bin/bar,')
|
||||||
|
|
||||||
|
def test_parse_peer_3_signal_rule(self):
|
||||||
|
self._test_parse_signal_rule('signal wr set=(pipe, usr1) peer=/sbin/baz,')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Reference in New Issue
Block a user