2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-09-01 14:55:10 +00:00

move several write_* functions to apparmor.profile_storage

ProfileStorage() stores the content of a profile, so it makes sense to
also have the functions to write those rules (including helper functions
used by these functions) in the same file.

Note that I only moved the functions for rule types that are not handled
by *Ruleset classes.

The functions for writing rules stored in a *Ruleset class will
hopefully be superfluous sooner or later (probably later because
serialize_parse_profile_start() depends on them, and rewriting it won't
be easy)

Also move the test for var_transform() to test-profile-storage.py.
This commit is contained in:
Christian Boltz
2018-05-09 22:08:44 +02:00
parent c47ed1d2e5
commit 66620f3e19
4 changed files with 177 additions and 167 deletions

View File

@@ -49,7 +49,9 @@ from apparmor.regex import (RE_PROFILE_START, RE_PROFILE_END, RE_PROFILE_LINK,
RE_PROFILE_UNIX, RE_RULE_HAS_COMMA, RE_HAS_COMMENT_SPLIT,
strip_quotes, parse_profile_start_line, re_match_include )
from apparmor.profile_storage import ProfileStorage, ruletypes
from apparmor.profile_storage import (ProfileStorage, ruletypes, write_alias,
write_includes, write_links, write_list_vars, write_mount,
write_pivot_root, write_unix)
import apparmor.rules as aarules
@@ -2610,89 +2612,18 @@ def write_header(prof_data, depth, name, embedded_hat, write_flags):
return data
def set_allow_str(allow):
if allow == 'deny':
return 'deny '
elif allow == 'allow':
return ''
elif allow == '':
return ''
else:
raise AppArmorException(_("Invalid allow string: %(allow)s"))
def set_ref_allow(prof_data, allow):
if allow:
return prof_data[allow], set_allow_str(allow)
else:
return prof_data, ''
def write_pair(prof_data, depth, allow, name, prefix, sep, tail, fn):
pre = ' ' * depth
data = []
ref, allow = set_ref_allow(prof_data, allow)
if ref.get(name, False):
for key in sorted(ref[name].keys()):
value = fn(ref[name][key]) # eval('%s(%s)' % (fn, ref[name][key]))
data.append('%s%s%s%s%s%s%s' % (pre, allow, prefix, key, sep, value, tail))
if ref[name].keys():
data.append('')
return data
def write_includes(prof_data, depth):
pre = ' ' * depth
data = []
for key in sorted(prof_data['include'].keys()):
if key.startswith('/'):
qkey = '"%s"' % key
else:
qkey = '<%s>' % quote_if_needed(key)
data.append('%s#include %s' % (pre, qkey))
if data:
data.append('')
return data
def write_change_profile(prof_data, depth):
data = []
if prof_data.get('change_profile', False):
data = prof_data['change_profile'].get_clean(depth)
return data
def write_alias(prof_data, depth):
pre = ' ' * depth
data = []
if prof_data['alias']:
for key in sorted(prof_data['alias'].keys()):
data.append('%salias %s -> %s,' % (pre, quote_if_needed(key), quote_if_needed(prof_data['alias'][key])))
data.append('')
return data
def write_rlimits(prof_data, depth):
data = []
if prof_data.get('rlimit', False):
data = prof_data['rlimit'].get_clean(depth)
return data
def var_transform(ref):
data = []
for value in ref:
if not value:
value = '""'
data.append(quote_if_needed(value))
return ' '.join(data)
def write_list_vars(prof_data, depth):
return write_pair(prof_data, depth, '', 'lvar', '', ' = ', '', var_transform)
def write_capabilities(prof_data, depth):
data = []
if prof_data.get('capability', False):
@@ -2711,24 +2642,6 @@ def write_dbus(prof_data, depth):
data = prof_data['dbus'].get_clean(depth)
return data
def write_mount_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
# no mount rules, so return
if not prof_data[allow].get('mount', False):
return data
for mount_rule in prof_data[allow]['mount']:
data.append('%s%s' % (pre, mount_rule.serialize()))
data.append('')
return data
def write_mount(prof_data, depth):
data = write_mount_rules(prof_data, depth, 'deny')
data += write_mount_rules(prof_data, depth, 'allow')
return data
def write_signal(prof_data, depth):
data = []
if prof_data.get('signal', False):
@@ -2741,69 +2654,6 @@ def write_ptrace(prof_data, depth):
data = prof_data['ptrace'].get_clean(depth)
return data
def write_pivot_root_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
# no pivot_root rules, so return
if not prof_data[allow].get('pivot_root', False):
return data
for pivot_root_rule in prof_data[allow]['pivot_root']:
data.append('%s%s' % (pre, pivot_root_rule.serialize()))
data.append('')
return data
def write_pivot_root(prof_data, depth):
data = write_pivot_root_rules(prof_data, depth, 'deny')
data += write_pivot_root_rules(prof_data, depth, 'allow')
return data
def write_unix_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
# no unix rules, so return
if not prof_data[allow].get('unix', False):
return data
for unix_rule in prof_data[allow]['unix']:
data.append('%s%s' % (pre, unix_rule.serialize()))
data.append('')
return data
def write_unix(prof_data, depth):
data = write_unix_rules(prof_data, depth, 'deny')
data += write_unix_rules(prof_data, depth, 'allow')
return data
def write_link_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = set_allow_str(allow)
if prof_data[allow].get('link', False):
for path in sorted(prof_data[allow]['link'].keys()):
to_name = prof_data[allow]['link'][path]['to']
subset = ''
if prof_data[allow]['link'][path]['mode'] & apparmor.aamode.AA_LINK_SUBSET:
subset = 'subset '
audit = ''
if prof_data[allow]['link'][path].get('audit', False):
audit = 'audit '
path = quote_if_needed(path)
to_name = quote_if_needed(to_name)
data.append('%s%s%slink %s%s -> %s,' % (pre, audit, allowstr, subset, path, to_name))
data.append('')
return data
def write_links(prof_data, depth):
data = write_link_rules(prof_data, depth, 'deny')
data += write_link_rules(prof_data, depth, 'allow')
return data
def write_file(prof_data, depth):
data = []
if prof_data.get('file', False):

View File

@@ -14,7 +14,9 @@
# ----------------------------------------------------------------------
from apparmor.common import AppArmorBug, hasher
from apparmor.aamode import AA_LINK_SUBSET
from apparmor.common import AppArmorBug, AppArmorException, hasher
from apparmor.rule.capability import CapabilityRuleset
from apparmor.rule.change_profile import ChangeProfileRuleset
@@ -25,6 +27,12 @@ from apparmor.rule.ptrace import PtraceRuleset
from apparmor.rule.rlimit import RlimitRuleset
from apparmor.rule.signal import SignalRuleset
from apparmor.rule import quote_if_needed
# setup module translations
from apparmor.translations import init_translation
_ = init_translation()
ruletypes = {
'capability': {'ruleset': CapabilityRuleset},
'change_profile': {'ruleset': ChangeProfileRuleset},
@@ -104,3 +112,155 @@ class ProfileStorage:
return self.data.get(key, fallback)
else:
raise AppArmorBug('attempt to read unknown key %s' % key)
def set_allow_str(allow):
if allow == 'deny':
return 'deny '
elif allow == 'allow':
return ''
elif allow == '':
return ''
else:
raise AppArmorException(_("Invalid allow string: %(allow)s"))
def set_ref_allow(prof_data, allow):
if allow:
return prof_data[allow], set_allow_str(allow)
else:
return prof_data, ''
def write_pair(prof_data, depth, allow, name, prefix, sep, tail, fn):
pre = ' ' * depth
data = []
ref, allow = set_ref_allow(prof_data, allow)
if ref.get(name, False):
for key in sorted(ref[name].keys()):
value = fn(ref[name][key]) # eval('%s(%s)' % (fn, ref[name][key]))
data.append('%s%s%s%s%s%s%s' % (pre, allow, prefix, key, sep, value, tail))
if ref[name].keys():
data.append('')
return data
def write_alias(prof_data, depth):
pre = ' ' * depth
data = []
if prof_data['alias']:
for key in sorted(prof_data['alias'].keys()):
data.append('%salias %s -> %s,' % (pre, quote_if_needed(key), quote_if_needed(prof_data['alias'][key])))
data.append('')
return data
def write_includes(prof_data, depth):
pre = ' ' * depth
data = []
for key in sorted(prof_data['include'].keys()):
if key.startswith('/'):
qkey = '"%s"' % key
else:
qkey = '<%s>' % quote_if_needed(key)
data.append('%s#include %s' % (pre, qkey))
if data:
data.append('')
return data
def var_transform(ref):
data = []
for value in ref:
if not value:
value = '""'
data.append(quote_if_needed(value))
return ' '.join(data)
def write_list_vars(prof_data, depth):
return write_pair(prof_data, depth, '', 'lvar', '', ' = ', '', var_transform)
def write_link_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = set_allow_str(allow)
if prof_data[allow].get('link', False):
for path in sorted(prof_data[allow]['link'].keys()):
to_name = prof_data[allow]['link'][path]['to']
subset = ''
if prof_data[allow]['link'][path]['mode'] & AA_LINK_SUBSET:
subset = 'subset '
audit = ''
if prof_data[allow]['link'][path].get('audit', False):
audit = 'audit '
path = quote_if_needed(path)
to_name = quote_if_needed(to_name)
data.append('%s%s%slink %s%s -> %s,' % (pre, audit, allowstr, subset, path, to_name))
data.append('')
return data
def write_links(prof_data, depth):
data = write_link_rules(prof_data, depth, 'deny')
data += write_link_rules(prof_data, depth, 'allow')
return data
def write_mount_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
# no mount rules, so return
if not prof_data[allow].get('mount', False):
return data
for mount_rule in prof_data[allow]['mount']:
data.append('%s%s' % (pre, mount_rule.serialize()))
data.append('')
return data
def write_mount(prof_data, depth):
data = write_mount_rules(prof_data, depth, 'deny')
data += write_mount_rules(prof_data, depth, 'allow')
return data
def write_pivot_root_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
# no pivot_root rules, so return
if not prof_data[allow].get('pivot_root', False):
return data
for pivot_root_rule in prof_data[allow]['pivot_root']:
data.append('%s%s' % (pre, pivot_root_rule.serialize()))
data.append('')
return data
def write_pivot_root(prof_data, depth):
data = write_pivot_root_rules(prof_data, depth, 'deny')
data += write_pivot_root_rules(prof_data, depth, 'allow')
return data
def write_unix(prof_data, depth):
data = write_unix_rules(prof_data, depth, 'deny')
data += write_unix_rules(prof_data, depth, 'allow')
return data
def write_unix_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
# no unix rules, so return
if not prof_data[allow].get('unix', False):
return data
for unix_rule in prof_data[allow]['unix']:
data.append('%s%s' % (pre, unix_rule.serialize()))
data.append('')
return data

View File

@@ -21,7 +21,7 @@ import apparmor.aa # needed to set global vars in some tests
from apparmor.aa import (check_for_apparmor, get_output, get_reqs, get_interpreter_and_abstraction, create_new_profile,
get_profile_flags, set_profile_flags, set_options_audit_mode, set_options_owner_mode, is_skippable_file, is_skippable_dir,
parse_profile_start, parse_profile_data, separate_vars, store_list_var, write_header,
var_transform, serialize_parse_profile_start, get_file_perms, propose_file_rules)
serialize_parse_profile_start, get_file_perms, propose_file_rules)
from apparmor.aare import AARE
from apparmor.common import AppArmorException, AppArmorBug
from apparmor.rule.file import FileRule
@@ -677,17 +677,6 @@ class AaTest_write_header(AATest):
result = write_header(prof_data, depth, name, embedded_hat, write_flags)
self.assertEqual(result, [expected])
class AaTest_var_transform(AATest):
tests = [
(['foo', ''], 'foo ""' ),
(['foo', 'bar'], 'foo bar' ),
([''], '""' ),
(['bar baz', 'foo'], '"bar baz" foo' ),
]
def _run_test(self, params, expected):
self.assertEqual(var_transform(params), expected)
class AaTest_serialize_parse_profile_start(AATest):
def _parse(self, line, profile, hat, prof_data_profile, prof_data_external):
# 'correct' is always True in the code that uses serialize_parse_profile_start() (set some lines above the function call)

View File

@@ -13,7 +13,7 @@ import unittest
from common_test import AATest, setup_all_loops
from apparmor.common import AppArmorBug
from apparmor.profile_storage import ProfileStorage
from apparmor.profile_storage import ProfileStorage, var_transform
class TestUnknownKey(AATest):
def AASetup(self):
@@ -35,6 +35,17 @@ class TestUnknownKey(AATest):
with self.assertRaises(AppArmorBug):
self.storage['foo'] = 'bar'
class AaTest_var_transform(AATest):
tests = [
(['foo', ''], 'foo ""' ),
(['foo', 'bar'], 'foo bar' ),
([''], '""' ),
(['bar baz', 'foo'], '"bar baz" foo' ),
]
def _run_test(self, params, expected):
self.assertEqual(var_transform(params), expected)
setup_all_loops(__name__)
if __name__ == '__main__':