mirror of
https://gitlab.com/apparmor/apparmor
synced 2025-08-29 05:17:59 +00:00
delete serialize_profile_from_old_profile()
... which is unused since the last commit. Also delete several functions that were only used by this function: - write_change_profile() - write_rlimits() - write_capabilities() - write_netdomain() - write_dbus() - write_signal() - write_ptrace() - write_file() Finally, no longer import some functions from profile_storage: - write_links - write_mount - write_pivot_root - write_unix BTW: Deleting these 460 lines improves test coverage of aa.py from 38% to 44%, and total test coverage from 63% to 66% :-)
This commit is contained in:
parent
469eb444de
commit
0eb12a8cbd
@ -50,8 +50,7 @@ from apparmor.regex import (RE_PROFILE_START, RE_PROFILE_END, RE_PROFILE_LINK,
|
||||
strip_quotes, parse_profile_start_line, re_match_include )
|
||||
|
||||
from apparmor.profile_storage import (ProfileStorage, ruletypes, write_alias,
|
||||
write_includes, write_links, write_list_vars, write_mount,
|
||||
write_pivot_root, write_unix)
|
||||
write_includes, write_list_vars )
|
||||
|
||||
import apparmor.rules as aarules
|
||||
|
||||
@ -2610,54 +2609,6 @@ def write_header(prof_data, depth, name, embedded_hat, write_flags):
|
||||
|
||||
return data
|
||||
|
||||
def write_change_profile(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('change_profile', False):
|
||||
data = prof_data['change_profile'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_rlimits(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('rlimit', False):
|
||||
data = prof_data['rlimit'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_capabilities(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('capability', False):
|
||||
data = prof_data['capability'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_netdomain(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('network', False):
|
||||
data = prof_data['network'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_dbus(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('dbus', False):
|
||||
data = prof_data['dbus'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_signal(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('signal', False):
|
||||
data = prof_data['signal'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_ptrace(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('ptrace', False):
|
||||
data = prof_data['ptrace'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_file(prof_data, depth):
|
||||
data = []
|
||||
if prof_data.get('file', False):
|
||||
data = prof_data['file'].get_clean(depth)
|
||||
return data
|
||||
|
||||
def write_rules(prof_data, depth):
|
||||
if type(prof_data) is not ProfileStorage:
|
||||
# TODO: find out why prof_data is not a ProfileStorage object in 11 of the libapparmor test_multi testcases
|
||||
@ -2773,418 +2724,6 @@ def serialize_parse_profile_start(line, file, lineno, profile, hat, prof_data_pr
|
||||
|
||||
return (profile, hat, attachment, flags, in_contained_hat, correct)
|
||||
|
||||
def serialize_profile_from_old_profile(profile_data, name, options):
|
||||
data = []
|
||||
string = ''
|
||||
include_metadata = False
|
||||
include_flags = True
|
||||
prof_filename = get_profile_filename(name)
|
||||
|
||||
write_filelist = deepcopy(filelist[prof_filename])
|
||||
write_prof_data = deepcopy(profile_data)
|
||||
|
||||
# XXX profile_data / write_prof_data contain only one profile with its hats
|
||||
# XXX this will explode if a file contains multiple profiles, see https://bugs.launchpad.net/ubuntu/+source/apparmor/+bug/1528139
|
||||
# XXX fixing this needs lots of write_prof_data[hat] -> write_prof_data[profile][hat] changes (and of course also a change in the calling code)
|
||||
# XXX (the better option is a full rewrite of serialize_profile_from_old_profile())
|
||||
|
||||
if options: # and type(options) == dict:
|
||||
if options.get('METADATA', False):
|
||||
include_metadata = True
|
||||
if options.get('NO_FLAGS', False):
|
||||
include_flags = False
|
||||
|
||||
if include_metadata:
|
||||
string = '# Last Modified: %s\n' % time.asctime()
|
||||
|
||||
if (profile_data[name].get('repo', False) and
|
||||
profile_data[name]['repo']['url'] and
|
||||
profile_data[name]['repo']['user'] and
|
||||
profile_data[name]['repo']['id']):
|
||||
repo = profile_data[name]['repo']
|
||||
string += '# REPOSITORY: %s %s %s\n' % (repo['url'], repo['user'], repo['id'])
|
||||
elif profile_data[name]['repo']['neversubmit']:
|
||||
string += '# REPOSITORY: NEVERSUBMIT\n'
|
||||
|
||||
if not os.path.isfile(prof_filename):
|
||||
raise AppArmorException(_("Can't find existing profile to modify"))
|
||||
|
||||
# profiles_list = filelist[prof_filename].keys() # XXX
|
||||
|
||||
with open_file_read(prof_filename) as f_in:
|
||||
profile = None
|
||||
hat = None
|
||||
write_methods = {'alias': write_alias,
|
||||
'lvar': write_list_vars,
|
||||
'include': write_includes,
|
||||
'rlimit': write_rlimits,
|
||||
'capability': write_capabilities,
|
||||
'network': write_netdomain,
|
||||
'dbus': write_dbus,
|
||||
'mount': write_mount,
|
||||
'signal': write_signal,
|
||||
'ptrace': write_ptrace,
|
||||
'pivot_root': write_pivot_root,
|
||||
'unix': write_unix,
|
||||
'link': write_links,
|
||||
'file': write_file,
|
||||
'change_profile': write_change_profile,
|
||||
}
|
||||
default_write_order = [ 'alias',
|
||||
'lvar',
|
||||
'include',
|
||||
'rlimit',
|
||||
'capability',
|
||||
'network',
|
||||
'dbus',
|
||||
'mount',
|
||||
'signal',
|
||||
'ptrace',
|
||||
'pivot_root',
|
||||
'unix',
|
||||
'link',
|
||||
'file',
|
||||
'change_profile',
|
||||
]
|
||||
# prof_correct = True # XXX correct?
|
||||
segments = {'alias': False,
|
||||
'lvar': False,
|
||||
'include': False,
|
||||
'rlimit': False,
|
||||
'capability': False,
|
||||
'network': False,
|
||||
'dbus': False,
|
||||
'mount': False,
|
||||
'signal': True, # not handled otherwise yet
|
||||
'ptrace': True, # not handled otherwise yet
|
||||
'pivot_root': False,
|
||||
'unix': False,
|
||||
'link': False,
|
||||
'file': False,
|
||||
'change_profile': False,
|
||||
'include_local_started': False, # unused
|
||||
}
|
||||
|
||||
def write_prior_segments(prof_data, segments, line):
|
||||
data = []
|
||||
for segs in list(filter(lambda x: segments[x], segments.keys())):
|
||||
depth = len(line) - len(line.lstrip())
|
||||
data += write_methods[segs](prof_data, int(depth / 2))
|
||||
segments[segs] = False
|
||||
# delete rules from prof_data to avoid duplication (they are in data now)
|
||||
if prof_data['allow'].get(segs, False):
|
||||
prof_data['allow'].pop(segs)
|
||||
if prof_data['deny'].get(segs, False):
|
||||
prof_data['deny'].pop(segs)
|
||||
if prof_data.get(segs, False):
|
||||
t = type(prof_data[segs])
|
||||
prof_data[segs] = t()
|
||||
return data
|
||||
|
||||
#data.append('reading prof')
|
||||
for line in f_in:
|
||||
correct = True
|
||||
line = line.rstrip('\n')
|
||||
#data.append(' ')#data.append('read: '+line)
|
||||
if RE_PROFILE_START.search(line):
|
||||
|
||||
(profile, hat, attachment, flags, in_contained_hat, correct) = serialize_parse_profile_start(
|
||||
line, prof_filename, None, profile, hat, write_prof_data[hat]['profile'], write_prof_data[hat]['external'], correct)
|
||||
|
||||
if not write_prof_data[hat]['name'] == profile:
|
||||
correct = False
|
||||
|
||||
if not write_filelist['profiles'][profile][hat] is True:
|
||||
correct = False
|
||||
|
||||
if not write_prof_data[hat]['flags'] == flags:
|
||||
correct = False
|
||||
|
||||
#Write the profile start
|
||||
if correct:
|
||||
if write_filelist:
|
||||
data += write_alias(write_filelist, 0)
|
||||
data += write_list_vars(write_filelist, 0)
|
||||
data += write_includes(write_filelist, 0)
|
||||
data.append(line)
|
||||
else:
|
||||
if write_prof_data[hat]['name'] == profile:
|
||||
depth = len(line) - len(line.lstrip())
|
||||
data += write_header(write_prof_data[name], int(depth / 2), name, False, include_flags)
|
||||
|
||||
elif RE_PROFILE_END.search(line):
|
||||
# DUMP REMAINDER OF PROFILE
|
||||
if profile:
|
||||
depth = int(len(line) - len(line.lstrip()) / 2) + 1
|
||||
|
||||
# first write sections that were modified
|
||||
#for segs in write_methods.keys():
|
||||
for segs in default_write_order:
|
||||
if segments[segs]:
|
||||
data += write_methods[segs](write_prof_data[name], depth)
|
||||
segments[segs] = False
|
||||
# delete rules from write_prof_data to avoid duplication (they are in data now)
|
||||
if write_prof_data[name]['allow'].get(segs, False):
|
||||
write_prof_data[name]['allow'].pop(segs)
|
||||
if write_prof_data[name]['deny'].get(segs, False):
|
||||
write_prof_data[name]['deny'].pop(segs)
|
||||
if write_prof_data[name].get(segs, False):
|
||||
t = type(write_prof_data[name][segs])
|
||||
write_prof_data[name][segs] = t()
|
||||
|
||||
# then write everything else
|
||||
for segs in default_write_order:
|
||||
data += write_methods[segs](write_prof_data[name], depth)
|
||||
|
||||
write_prof_data.pop(name)
|
||||
|
||||
#Append local includes
|
||||
data.append(line)
|
||||
|
||||
if not in_contained_hat:
|
||||
# Embedded hats
|
||||
depth = int((len(line) - len(line.lstrip())) / 2)
|
||||
pre2 = ' ' * (depth + 1)
|
||||
for hat in list(filter(lambda x: x != name, sorted(profile_data.keys()))):
|
||||
if not profile_data[hat]['external']:
|
||||
data.append('')
|
||||
if profile_data[hat]['profile']:
|
||||
data += list(map(str, write_header(profile_data[hat], depth + 1, hat, True, include_flags)))
|
||||
else:
|
||||
data += list(map(str, write_header(profile_data[hat], depth + 1, '^' + hat, True, include_flags)))
|
||||
|
||||
data += list(map(str, write_rules(profile_data[hat], depth + 2)))
|
||||
|
||||
data.append('%s}' % pre2)
|
||||
|
||||
# External hats
|
||||
for hat in list(filter(lambda x: x != name, sorted(profile_data.keys()))):
|
||||
if profile_data[hat].get('external', False):
|
||||
data.append('')
|
||||
data += list(map(lambda x: ' %s' % x, write_piece(profile_data, depth - 1, name, name, include_flags)))
|
||||
data.append(' }')
|
||||
|
||||
if in_contained_hat:
|
||||
#Hat processed, remove it
|
||||
hat = profile
|
||||
in_contained_hat = False
|
||||
else:
|
||||
profile = None
|
||||
|
||||
elif CapabilityRule.match(line):
|
||||
cap = CapabilityRule.parse(line)
|
||||
if write_prof_data[hat]['capability'].is_covered(cap, True, True):
|
||||
if not segments['capability'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['capability'] = True
|
||||
write_prof_data[hat]['capability'].delete(cap)
|
||||
data.append(line)
|
||||
else:
|
||||
# To-Do
|
||||
pass
|
||||
elif RE_PROFILE_LINK.search(line):
|
||||
matches = RE_PROFILE_LINK.search(line).groups()
|
||||
audit = False
|
||||
if matches[0]:
|
||||
audit = True
|
||||
allow = 'allow'
|
||||
if matches[1] and matches[1].strip() == 'deny':
|
||||
allow = 'deny'
|
||||
|
||||
subset = matches[3]
|
||||
link = strip_quotes(matches[6])
|
||||
value = strip_quotes(matches[7])
|
||||
if not write_prof_data[hat][allow]['link'][link]['to'] == value:
|
||||
correct = False
|
||||
if not write_prof_data[hat][allow]['link'][link]['mode'] & apparmor.aamode.AA_MAY_LINK:
|
||||
correct = False
|
||||
if subset and not write_prof_data[hat][allow]['link'][link]['mode'] & apparmor.aamode.AA_LINK_SUBSET:
|
||||
correct = False
|
||||
if audit and not write_prof_data[hat][allow]['link'][link]['audit'] & apparmor.aamode.AA_LINK_SUBSET:
|
||||
correct = False
|
||||
|
||||
if correct:
|
||||
if not segments['link'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['link'] = True
|
||||
write_prof_data[hat][allow]['link'].pop(link)
|
||||
data.append(line)
|
||||
else:
|
||||
# To-Do
|
||||
pass
|
||||
|
||||
elif ChangeProfileRule.match(line):
|
||||
change_profile_obj = ChangeProfileRule.parse(line)
|
||||
if write_prof_data[hat]['change_profile'].is_covered(change_profile_obj, True, True):
|
||||
if not segments['change_profile'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['change_profile'] = True
|
||||
write_prof_data[hat]['change_profile'].delete(change_profile_obj)
|
||||
data.append(line)
|
||||
|
||||
elif RE_PROFILE_ALIAS.search(line):
|
||||
matches = RE_PROFILE_ALIAS.search(line).groups()
|
||||
|
||||
from_name = strip_quotes(matches[0])
|
||||
to_name = strip_quotes(matches[1])
|
||||
|
||||
if profile:
|
||||
if not write_prof_data[hat]['alias'][from_name] == to_name:
|
||||
correct = False
|
||||
else:
|
||||
if not write_filelist['alias'][from_name] == to_name:
|
||||
correct = False
|
||||
|
||||
if correct:
|
||||
if not segments['alias'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['alias'] = True
|
||||
if profile:
|
||||
write_prof_data[hat]['alias'].pop(from_name)
|
||||
else:
|
||||
write_filelist['alias'].pop(from_name)
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
|
||||
elif RlimitRule.match(line):
|
||||
rlimit_obj = RlimitRule.parse(line)
|
||||
|
||||
if write_prof_data[hat]['rlimit'].is_covered(rlimit_obj, True, True):
|
||||
if not segments['rlimit'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['rlimit'] = True
|
||||
write_prof_data[hat]['rlimit'].delete(rlimit_obj)
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
|
||||
elif RE_PROFILE_BOOLEAN.search(line):
|
||||
matches = RE_PROFILE_BOOLEAN.search(line).groups()
|
||||
bool_var = matches[0]
|
||||
value = matches[1]
|
||||
|
||||
if not write_prof_data[hat]['lvar'][bool_var] == value:
|
||||
correct = False
|
||||
|
||||
if correct:
|
||||
if not segments['lvar'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['lvar'] = True
|
||||
write_prof_data[hat]['lvar'].pop(bool_var)
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
elif RE_PROFILE_VARIABLE.search(line):
|
||||
matches = RE_PROFILE_VARIABLE.search(line).groups()
|
||||
list_var = strip_quotes(matches[0])
|
||||
var_operation = matches[1]
|
||||
value = strip_quotes(matches[2])
|
||||
var_set = hasher()
|
||||
if var_operation == '+=':
|
||||
correct = False # adding proper support for "add to variable" needs big changes
|
||||
# (like storing a variable's "history" - where it was initially defined and what got added where)
|
||||
# so just skip any comparison and assume a non-match
|
||||
elif profile:
|
||||
store_list_var(var_set, list_var, value, var_operation, prof_filename)
|
||||
if not var_set[list_var] == write_prof_data['lvar'].get(list_var, False):
|
||||
correct = False
|
||||
else:
|
||||
store_list_var(var_set, list_var, value, var_operation, prof_filename)
|
||||
if not var_set[list_var] == write_filelist['lvar'].get(list_var, False):
|
||||
correct = False
|
||||
|
||||
if correct:
|
||||
if not segments['lvar'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['lvar'] = True
|
||||
if profile:
|
||||
write_prof_data[hat]['lvar'].pop(list_var)
|
||||
else:
|
||||
write_filelist['lvar'].pop(list_var)
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
|
||||
elif re_match_include(line):
|
||||
include_name = re_match_include(line)
|
||||
if profile:
|
||||
if write_prof_data[hat]['include'].get(include_name, False):
|
||||
if not segments['include'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['include'] = True
|
||||
write_prof_data[hat]['include'].pop(include_name)
|
||||
data.append(line)
|
||||
else:
|
||||
if write_filelist['include'].get(include_name, False):
|
||||
write_filelist['include'].pop(include_name)
|
||||
data.append(line)
|
||||
|
||||
elif NetworkRule.match(line):
|
||||
network_obj = NetworkRule.parse(line)
|
||||
if write_prof_data[hat]['network'].is_covered(network_obj, True, True):
|
||||
if not segments['network'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['network'] = True
|
||||
write_prof_data[hat]['network'].delete(network_obj)
|
||||
data.append(line)
|
||||
|
||||
elif RE_PROFILE_CHANGE_HAT.search(line):
|
||||
# "^hat," declarations are no longer supported, ignore them and don't write out the line
|
||||
# (parse_profile_data() already prints a warning about that)
|
||||
pass
|
||||
elif RE_PROFILE_HAT_DEF.search(line):
|
||||
matches = RE_PROFILE_HAT_DEF.search(line)
|
||||
in_contained_hat = True
|
||||
hat = matches.group('hat')
|
||||
hat = strip_quotes(hat)
|
||||
flags = matches.group('flags')
|
||||
|
||||
if not write_prof_data[hat]['flags'] == flags:
|
||||
correct = False
|
||||
if not write_filelist['profile'][profile][hat]:
|
||||
correct = False
|
||||
if correct:
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
elif FileRule.match(line):
|
||||
# leading permissions could look like a keyword, therefore handle file rules after everything else
|
||||
file_obj = FileRule.parse(line)
|
||||
|
||||
if write_prof_data[hat]['file'].is_covered(file_obj, True, True):
|
||||
if not segments['file'] and True in segments.values():
|
||||
data += write_prior_segments(write_prof_data[name], segments, line)
|
||||
segments['file'] = True
|
||||
write_prof_data[hat]['file'].delete(file_obj)
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
|
||||
else:
|
||||
if correct:
|
||||
data.append(line)
|
||||
else:
|
||||
#To-Do
|
||||
pass
|
||||
# data.append('prof done')
|
||||
# if write_filelist:
|
||||
# data += write_alias(write_filelist, 0)
|
||||
# data += write_list_vars(write_filelist, 0)
|
||||
# data += write_includes(write_filelist, 0)
|
||||
# data.append('from filelist over')
|
||||
# data += write_piece(write_prof_data, 0, name, name, include_flags)
|
||||
|
||||
string += '\n'.join(data)
|
||||
|
||||
return string + '\n'
|
||||
|
||||
def write_profile_ui_feedback(profile):
|
||||
aaui.UI_Info(_('Writing updated profile for %s.') % profile)
|
||||
write_profile(profile)
|
||||
|
Loading…
x
Reference in New Issue
Block a user