2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-30 22:05:27 +00:00

Intermediate: codebase update with broken tests

This commit is contained in:
Kshitij Gupta
2013-07-30 20:13:08 +05:30
parent 1af5f1f03f
commit 928e4503c6
4 changed files with 696 additions and 79 deletions

88
Testing/aa_test.py Normal file
View File

@@ -0,0 +1,88 @@
'''
Created on Jul 29, 2013
@author: kshitij
'''
import unittest
import sys
sys.path.append('../')
import apparmor.aa
#from apparmor.aa import parse_event
class Test(unittest.TestCase):
def test_parse_event(self):
event = 'type=AVC msg=audit(1345027352.096:499): apparmor="ALLOWED" operation="rename_dest" parent=6974 profile="/usr/sbin/httpd2-prefork//vhost_balmar" name=2F686F6D652F7777772F62616C6D61722E646174616E6F766F322E64652F68747470646F63732F6A6F6F6D6C612F696D616765732F6B75656368656E2F666F746F20322E6A7067 pid=20143 comm="httpd2-prefork" requested_mask="wc" denied_mask="wc" fsuid=30 ouid=30'
parsed_event = apparmor.aa.parse_event(event)
print(parsed_event)
event = 'type=AVC msg=audit(1322614912.304:857): apparmor="ALLOWED" operation="getattr" parent=16001 profile=74657374207370616365 name=74657374207370616365 pid=17011 comm="bash" requested_mask="r" denied_mask="r" fsuid=0 ouid=0'
parsed_event = apparmor.aa.parse_event(event)
print(parsed_event)
event = 'type=AVC msg=audit(1322614918.292:4376): apparmor="ALLOWED" operation="file_perm" parent=16001 profile=74657374207370616365 name="/home/jj/.bash_history" pid=17011 comm="bash" requested_mask="w" denied_mask="w" fsuid=0 ouid=1000'
parsed_event = apparmor.aa.parse_event(event)
print(parsed_event)
def test_modes_to_string(self):
self.assertEqual(apparmor.aa.mode_to_str(32270), 'rwPCUx')
MODE_TEST = {'x': apparmor.aa.AA_MAY_EXEC,
'w': apparmor.aa.AA_MAY_WRITE,
'r': apparmor.aa.AA_MAY_READ,
'a': apparmor.aa.AA_MAY_APPEND,
'l': apparmor.aa.AA_MAY_LINK,
'k': apparmor.aa.AA_MAY_LOCK,
'm': apparmor.aa.AA_EXEC_MMAP,
'i': apparmor.aa.AA_EXEC_INHERIT,
'u': apparmor.aa.AA_EXEC_UNCONFINED + apparmor.aa.AA_EXEC_UNSAFE, # Unconfined + Unsafe
'U': apparmor.aa.AA_EXEC_UNCONFINED,
'p': apparmor.aa.AA_EXEC_PROFILE + apparmor.aa.AA_EXEC_UNSAFE, # Profile + unsafe
'P': apparmor.aa.AA_EXEC_PROFILE,
'c': apparmor.aa.AA_EXEC_CHILD + apparmor.aa.AA_EXEC_UNSAFE, # Child + Unsafe
'C': apparmor.aa.AA_EXEC_CHILD,
#'n': AA_EXEC_NT + AA_EXEC_UNSAFE,
#'N': AA_EXEC_NT
}
while MODE_TEST:
string,mode = MODE_TEST.popitem()
self.assertEqual(apparmor.aa.mode_to_str(mode), string)
mode = 2048
self.assertEqual(apparmor.aa.mode_to_str(mode), 'C')
def test_string_to_modes(self):
#self.assertEqual(apparmor.aa.str_to_mode('wc'), 32270)
MODE_TEST = {'x': apparmor.aa.AA_MAY_EXEC,
'w': apparmor.aa.AA_MAY_WRITE,
'r': apparmor.aa.AA_MAY_READ,
'a': apparmor.aa.AA_MAY_APPEND,
'l': apparmor.aa.AA_MAY_LINK,
'k': apparmor.aa.AA_MAY_LOCK,
'm': apparmor.aa.AA_EXEC_MMAP,
'i': apparmor.aa.AA_EXEC_INHERIT,
'u': apparmor.aa.AA_EXEC_UNCONFINED + apparmor.aa.AA_EXEC_UNSAFE, # Unconfined + Unsafe
'U': apparmor.aa.AA_EXEC_UNCONFINED,
'p': apparmor.aa.AA_EXEC_PROFILE + apparmor.aa.AA_EXEC_UNSAFE, # Profile + unsafe
'P': apparmor.aa.AA_EXEC_PROFILE,
'c': apparmor.aa.AA_EXEC_CHILD + apparmor.aa.AA_EXEC_UNSAFE, # Child + Unsafe
'C': apparmor.aa.AA_EXEC_CHILD,
#'n': AA_EXEC_NT + AA_EXEC_UNSAFE,
#'N': AA_EXEC_NT
}
#while MODE_TEST:
# string,mode = MODE_TEST.popitem()
# self.assertEqual(apparmor.aa.str_to_mode(string), mode)
#self.assertEqual(apparmor.aa.str_to_mode('C'), 2048)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()

View File

@@ -1,4 +1,4 @@
#4925
#5546
#382-430
#480-525
# No old version logs, only 2.6 + supported
@@ -451,7 +451,7 @@ def create_new_profile(localfile):
if interpreter == 'perl':
local_profile[localfile]['include']['abstractions/perl'] = True
elif re.search('python([23]|[23]\.[0-9])?$', interpreter):
elif re.search('^python([23]|[23]\.[0-9]+)?$', interpreter):
local_profile[localfile]['include']['abstractions/python'] = True
elif interpreter == 'ruby':
local_profile[localfile]['include']['abstractions/ruby'] = True
@@ -1631,10 +1631,8 @@ def read_log(logmark):
def parse_event(msg):
"""Parse the event from log into key value pairs"""
msg = msg.strip()
debug_logger.log('parse_event: %s' % msg)
#debug_logger.log('parse_event: %s' % msg)
event = LibAppArmor.parse_record(msg)
rmask = None
dmask = None
ev = dict()
ev['resource'] = event.info
ev['active_hat'] = event.active_hat
@@ -1668,8 +1666,9 @@ def parse_event(msg):
dmask = dmask.replace('d', 'w')
if not validate_log_mode(hide_log_mode(dmask)):
fatal_error(gettext('Log contains unknown mode %s') % dmask)
#print('parse_event:', ev['profile'], dmask, ev['name2'])
mask, name = log_str_to_mode(ev['profile'], dmask, ev['name2'])
ev['denied_mask'] = mask
ev['name2'] = name
@@ -1678,11 +1677,12 @@ def parse_event(msg):
ev['name2'] = name
if not ev['time']:
ev['time'] = int(time.time)
ev['time'] = int(time.time())
# Remove None keys
#for key in ev.keys():
# if not ev[key] or not re.search('[\w]+', ev[key]):
# ev.pop(key)
if ev['aamode']:
# Convert aamode values to their counter-parts
mode_convertor = {
@@ -1695,12 +1695,12 @@ def parse_event(msg):
6: 'STATUS'
}
try:
ev['aamode'] = mode_convertor(ev['aamode'])
ev['aamode'] = mode_convertor[ev['aamode']]
except KeyError:
ev['aamode'] = None
if ev['aamode']:
debug_logger.debug(ev)
#debug_logger.debug(ev)
return ev
else:
return None
@@ -1963,7 +1963,7 @@ def ask_the_questions():
if incn == incname:
include_valid = True
if incname.startswith('abstractions/'):
if valid_abstraction(incname):
include_valid = True
if not include_valid:
@@ -2172,20 +2172,23 @@ def ask_the_questions():
if newpath[-4:] == '/**/' or newpath[-3:] == '/*/':
# /foo/**/ and /foo/*/ => /**/
newpath = re.sub('/[^/]+/\*{1,2}/$', '/**/', newpath) #re.sub('/[^/]+/\*{1,2}$/', '/\*\*/', newpath)
# /foo**/ => /**/
elif re.search('/[^/]+\*\*/$', newpath):
newpath = re.sub('/[^/]+\*\*/$', '/**/', newpath)
elif re.search('/[^/]+\*\*[^/]*/$', newpath):
# /foo**/ and /foo**bar/ => /**/
newpath = re.sub('/[^/]+\*\*[^/]*/$', '/**/', newpath)
elif re.search('/\*\*[^/]+/$', newpath):
# /**bar/ => /**/
newpath = re.sub('/\*\*[^/]+/$', '/**/', newpath)
else:
newpath = re.sub('/[^/]+/$', '/*/', newpath)
else:
# /foo/** and /foo/* => /**
else:
if newpath[-3:] == '/**' or newpath[-2:] == '/*':
newpath = re.sub('/[^/]+/\*{1,2}$', '/**', newpath)
# /**foo => /**
elif re.search('/\*\*[^/]+$', newpath):
newpath = re.sub('/\*\*[^/]+$', '/**', newpath)
# /foo** => /**
# /foo/** and /foo/* => /**
newpath = re.sub('/[^/]+/\*{1,2}$', '/**', newpath)
elif re.search('/[^/]*\*\*[^/]+$', newpath):
# /**foo and /foor**bar => /**
newpath = re.sub('/[^/]*\*\*[^/]+$', '/**', newpath)
elif re.search('/[^/]+\*\*$', newpath):
# /foo** => /**
newpath = re.sub('/[^/]+\*\*$', '/**', newpath)
else:
newpath = re.sub('/[^/]+$', '/*', newpath)
@@ -2198,18 +2201,22 @@ def ask_the_questions():
newpath = options[selected].strip()
if not re_match_include(newpath):
# match /**.ext and /*.ext
match = re.search('/\*{1,2}(\.[^/]+)$', newpath)
match = re.search('/\*{1,2}(\.[^/]+)$', newpath)
if match:
# /foo/**.ext and /foo/*.ext => /**.ext
newpath = re.sub('/[^/]+/\*{1,2}\.[^/]+$', '/**'+match.group()[0], newpath)
# /foo**.ext => /**.ext
elif re.search('/[^/]+\*\*\.[^/]+$'):
match = re.search('/[^/]+\*\*(\.[^/]+)$')
newpath = re.sub('/[^/]+\*\*\.[^/]+$', '/**'+match.groups()[0], newpath)
elif re.search('/[^/]+\*\*[^/]*\.[^/]+$'):
# /foo**.ext and /foo**bar.ext => /**.ext
match = re.search('/[^/]+\*\*[^/]*(\.[^/]+)$')
newpath = re.sub('/[^/]+\*\*[^/]*\.[^/]+$', '/**'+match.groups()[0], newpath)
elif re.search('/\*\*[^/]+\.[^/]+$'):
# /**foo.ext => /**.ext
match = re.search('/\*\*[^/]+(\.[^/]+)$')
newpath = re.sub('/\*\*[^/]+\.[^/]+$', '/**'+match.groups()[0], newpath)
else:
# /foo.ext => /*.ext
match = re.search('(\.[^/]+)$')
newpath = re.sub('/[^/]+(\.[^/]+)$', '/*'+match.groups()[0], newpath)
if newpath not in options:
options.append(newpath)
default_option = len(options)
@@ -2382,57 +2389,45 @@ def match_net_include(incname, family, type):
name = includelist.pop(0)
else:
name = False
return False
def match_cap_includes(profile, cap):
newincludes = []
includevalid = False
for incname in include.keys():
includevalid = False
if profile['include'].get(incname, False):
continue
if cfg['settings']['custom_includes']:
for incm in cfg['settings']['custom_includes'].split():
if incm in incname:
includevalid = True
if 'abstractions' in incname:
includevalid = True
if not includevalid:
continue
if include[incname][incname]['allow']['capability'][cap].get('set', False) == 1:
if valid_include(profile, incname) and include[incname][incname]['allow']['capability'][cap].get('set', False) == 1:
newincludes.append(incname)
return newincludes
def re_match_include(path):
"""Matches the path for include and returns the include path"""
regex_include = re.compile('^\s*#?include\s*<(\.*)>')
regex_include = re.compile('^\s*#?include\s*<(\.*)\s*(#.*)?$>')
match = regex_include.search(path)
if match:
return match.groups()[0]
else:
return None
def valid_include(profile, incname):
if profile['include'].get(incname, False):
return False
if cfg['settings']['custom_includes']:
for incm in cfg['settings']['custom_includes'].split():
if incm == incname:
return True
if incname.startswith('abstractions/') and os.path.isfile(profile_dir + '/' + incname):
return True
return False
def match_net_includes(profile, family, nettype):
newincludes = []
includevalid = False
for incname in include.keys():
includevalid = False
if profile['include'].get(incname, False):
continue
if cfg['settings']['custom_includes']:
for incm in cfg['settings']['custom_includes'].split():
if incm == incname:
includevalid = True
if incname.startswith('abstractions/'):
includevalid = True
if includevalid and match_net_include(incname, family, type):
if valid_include(profile, incname) and match_net_include(incname, family, type):
newincludes.append(incname)
return newincludes
@@ -2564,22 +2559,20 @@ def get_pager():
pass
def generate_diff(oldprofile, newprofile):
oldtemp = tempfile.NamedTemporaryFile('wr', delete=False)
oldtemp = tempfile.NamedTemporaryFile('wr')
oldtemp.write(oldprofile)
oldtemp.flush()
newtemp = tempfile.NamedTemporaryFile('wr', delete=False)
newtemp = tempfile.NamedTemporaryFile('wr')
newtemp.write(newprofile)
newtemp.flush()
difftemp = tempfile.NamedTemporaryFile('wr', deleted=False)
difftemp = tempfile.NamedTemporaryFile('wr', delete=False)
subprocess.call('diff -u %s %s > %s' %(oldtemp.name, newtemp.name, difftemp.name), shell=True)
subprocess.call('diff -u -p %s %s > %s' %(oldtemp.name, newtemp.name, difftemp.name), shell=True)
oldtemp.delete = True
oldtemp.close()
newtemp.delete = True
newtemp.close()
return difftemp
@@ -2588,7 +2581,7 @@ def get_profile_diff(oldprofile, newprofile):
diff = []
with open_file_read(difftemp.name) as f_in:
for line in f_in:
if not (line.startswith('---') and line .startswith('+++') and re.search('^\@\@.*\@\@$', line)):
if not (line.startswith('---') and line .startswith('+++') and line.startswith('@@')):
diff.append(line)
difftemp.delete = True
@@ -2605,18 +2598,19 @@ def display_changes(oldprofile, newprofile):
difftemp.close()
def set_process(pid, profile):
# If process running don't do anything
if os.path.exists('/proc/%s/attr/current' % pid):
# If process not running don't do anything
if not os.path.exists('/proc/%s/attr/current' % pid):
return None
process = None
try:
process = open_file_read('/proc/%s/attr/current')
process = open_file_read('/proc/%s/attr/current' % pid)
except IOError:
return None
current = process.readline().strip()
process.close()
if not re.search('null(-complain)*-profile', current):
if not re.search('^null(-complain)*-profile$', current):
return None
stats = None
@@ -2687,7 +2681,7 @@ def commonsuffix(old, new):
# Weird regex
pass
def spilt_log_mode(mode):
def split_log_mode(mode):
user = ''
other = ''
match = re.search('(.*?)::(.*)', mode)
@@ -2696,7 +2690,7 @@ def spilt_log_mode(mode):
else:
user = mode
other = mode
#print ('split_logmode:', user, mode)
return user, other
def map_log_mode(mode):
@@ -2728,11 +2722,12 @@ def sub_str_to_mode(string):
mode = 0
if not string:
return mode
while str:
while string:
pattern = '(%s)' % MODE_MAP_RE.pattern
tmp = re.search(pattern, string).groups()[0]
re.sub(pattern, '', string)
tmp = re.search(pattern, string)
if tmp:
tmp = tmp.groups()[0]
string = re.sub(pattern, '', string)
if tmp and MODE_HASH.get(tmp, False):
mode |= MODE_HASH[tmp]
else:
@@ -2753,15 +2748,19 @@ def str_to_mode(string):
if not user:
user = other
mode = sub_str_to_mode(user)
#print(string, mode)
#print(string, 'other', sub_str_to_mode(other))
mode |= (sub_str_to_mode(other) << AA_OTHER_SHIFT)
#print (string, mode)
#print('str_to_mode:', mode)
return mode
def log_str_to_mode(profile, string, nt_name):
mode = str_to_mode(string)
# If contains nx and nix
#print (profile, string, nt_name)
if contains(mode, 'Nx'):
# Transform to px, cx
match = re.search('(.+?)//(.+?)', nt_name)
@@ -2977,3 +2976,533 @@ def attach_profile_data(profiles, profile_data):
# arising due to mutables
for p in profile_data.keys():
profiles[p] = deepcopy(profile_data[p])
def parse_profile_data(data, file, do_include):
profile_data = hasher()
profile = None
hat = None
in_contained_hat = None
repo_data = None
parsed_profiles = []
initial_comment = ''
RE_PROFILE_START = re.compile('^\s*(("??\/.+?"??)|(profile\s+("??.+?"??)))\s+((flags=)?\((.+)\)\s+)*\{\s*(#.*)?$')
RE_PROFILE_END = re.compile('^\s*\}\s*(#.*)?$')
RE_PROFILE_CAP = re.compile('^\s*(audit\s+)?(deny\s+)?capability\s+(\S+)\s*,\s*(#.*)?$')
RE_PROFILE_SET_CAP = re.compile('^\s*set capability\s+(\S+)\s*,\s*(#.*)?$')
RE_PROFILE_LINK = re.compile('^\s*(audit\s+)?(deny\s+)?link\s+(((subset)|(<=))\s+)?([\"\@\/].*?"??)\s+->\s*([\"\@\/].*?"??)\s*,\s*(#.*)?$')
RE_PROFILE_CHANGE_PROFILE = re.compile('^\s*change_profile\s+->\s*("??.+?"??),(#.*)?$')
RE_PROFILE_ALIAS = re.compile('^\s*alias\s+("??.+?"??)\s+->\s*("??.+?"??)\s*,(#.*)?$')
RE_PROFILE_RLIMIT = re.compile('^\s*set\s+rlimit\s+(.+)\s+<=\s*(.+)\s*,(#.*)?$')
RE_PROFILE_BOOLEAN = re.compile('^\s*(\$\{?[[:alpha:]][[:alnum:]_]*\}?)\s*=\s*(true|false)\s*,?\s*(#.*)?$')
RE_PROFILE_VARIABLE = re.compile('^\s*(@\{?[[:alpha:]][[:alnum:]_]+\}?)\s*\+?=\s*(.+?)\s*,?\s*(#.*)?$')
RE_PROFILE_CONDITIONAL = re.compile('^\s*if\s+(not\s+)?(\$\{?[[:alpha:]][[:alnum:]_]*\}?)\s*\{\s*(#.*)?$')
RE_PROFILE_CONDITIONAL_VARIABLE = re.compile('^\s*if\s+(not\s+)?defined\s+(@\{?[[:alpha:]][[:alnum:]_]+\}?)\s*\{\s*(#.*)?$')
RE_PROFILE_CONDITIONAL_BOOLEAN = re.compile('^\s*if\s+(not\s+)?defined\s+(\$\{?[[:alpha:]][[:alnum:]_]+\}?)\s*\{\s*(#.*)?$')
RE_PROFILE_PATH_ENTRY = re.compile('^\s*(audit\s+)?(deny\s+)?(owner\s+)?([\"\@\/].*?)\s+(\S+)(\s+->\s*(.*?))?\s*,\s*(#.*)?$')
RE_PROFILE_NETWORK = re.compile('^\s*(audit\s+)?(deny\s+)?network(.*)\s*(#.*)?$')
RE_PROFILE_CHANGE_HAT = re.compile('^\s*\^(\"??.+?\"??)\s*,\s*(#.*)?$')
RE_PROFILE_HAT_DEF = re.compile('^\s*\^(\"??.+?\"??)\s+((flags=)?\((.+)\)\s+)*\{\s*(#.*)?$')
if do_include:
profile = file
hat = file
for lineno, line in enumerate(data):
line = line.strip()
if not line:
continue
# Starting line of a profile
if RE_PROFILE_START.search(line):
matches = RE_PROFILE_START.search(line).groups()
if profile:
if profile != hat or matches[3]:
raise AppArmorException('%s profile in %s contains syntax errors in line: %s.\n' % (profile, file, lineno+1))
# Keep track of the start of a profile
if profile and profile == hat and matches[3]:
# local profile
hat = matches[3]
in_contained_hat = True
profile_data[profile][hat]['profile'] = True
else:
if matches[1]:
profile = matches[1]
else:
profile = matches[3]
profile, hat = profile.split('//')[:2]
in_contained_hat = False
if hat:
profile_data[profile][hat]['external'] = True
else:
hat = profile
flags = matches[6]
profile = strip_quotes(profile)
if hat:
hat = strip_quotes(hat)
# save profile name and filename
profile_data[profile][hat]['name'] = profile
profile_data[profile][hat]['filename'] = file
filelist[file]['profiles'][profile][hat] = True
profile_data[profile][hat]['flags'] = flags
profile_data[profile][hat]['allow']['netdomain'] = hasher()
profile_data[profile][hat]['allow']['path'] = hasher()
# Save the initial comment
if initial_comment:
profile_data[profile][hat]['initial_comment'] = initial_comment
initial_comment = ''
if repo_data:
profile_data[profile][profile]['repo']['url'] = repo_data['url']
profile_data[profile][profile]['repo']['user'] = repo_data['user']
elif RE_PROFILE_END.search(line):
# If profile ends and we're not in one
if not profile:
raise AppArmorException('Syntax Error: Unexpected End of Profile reached in file: %s line: %s' % (file, lineno+1))
if in_contained_hat:
hat = profile
in_contained_hat = False
else:
parsed_profiles.append(profile)
profile = None
initial_comment = ''
elif RE_PROFILE_CAP.search(line):
matches = RE_PROFILE_CAP.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected capability entry found in file: %s line: %s' % (file, lineno+1))
audit = False
if matches[0]:
audit = True
allow = 'allow'
if matches[1]:
allow = 'deny'
capability = matches[2]
profile_data[profile][hat][allow]['capability'][capability]['set'] = True
profile_data[profile][hat][allow]['capability'][capability]['audit'] = audit
elif RE_PROFILE_SET_CAP.search(line):
matches = RE_PROFILE_SET_CAP.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected capability entry found in file: %s line: %s' % (file, lineno+1))
capability = matches[0]
profile_data[profile][hat]['set_capability'][capability] = True
elif RE_PROFILE_LINK.ssearch(line):
matches = RE_PROFILE_LINK.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected link entry found in file: %s line: %s' % (file, lineno+1))
audit = False
if matches[0]:
audit = True
allow = 'allow'
if matches[1]:
allow = 'deny'
subset = matches[3]
link = strip_quotes(matches[6])
value = strip_quotes(matches[7])
profile_data[profile][hat][allow]['link'][link]['to'] = value
profile_data[profile][hat][allow]['link'][link]['mode'] = profile_data[profile][hat][allow]['link'][link].get('mode', 0) | AA_MAY_LINK
if subset:
profile_data[profile][hat][allow]['link'][link]['mode'] |= AA_LINK_SUBSET
if audit:
profile_data[profile][hat][allow]['link'][link]['audit'] = profile_data[profile][hat][allow]['link'][link].get('audit', 0) | AA_LINK_SUBSET
else:
profile_data[profile][hat][allow]['link'][link]['audit'] = 0
elif RE_PROFILE_CHANGE_PROFILE.search(line):
matches = RE_PROFILE_CHANGE_PROFILE.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected change profile entry found in file: %s line: %s' % (file, lineno+1))
cp = strip_quotes(matches[0])
profile_data[profile][hat]['changes_profile'][cp] = True
elif RE_PROFILE_ALIAS.search(line):
matches = RE_PROFILE_ALIAS.search(line).groups()
from_name = strip_quotes(matches[0])
to_name = strip_quotes(matches[1])
if profile:
profile_data[profile][hat]['alias'][from_name] = to_name
else:
if not filelist.get(file, False):
filelist[file] = hasher()
filelist[file]['alias'][from_name] = to_name
elif RE_PROFILE_RLIMIT.search(line):
matches = RE_PROFILE_RLIMIT.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected rlimit entry found in file: %s line: %s' % (file, lineno+1))
from_name = matches[0]
to_name = matches[1]
profile_data[profile][hat]['rlimit'][from_name] = to_name
elif RE_PROFILE_BOOLEAN.search(line, flags=re.IGNORECASE):
matches = RE_PROFILE_BOOLEAN.search(line, flags=re.IGNORECASE)
if not profile:
raise AppArmorException('Syntax Error: Unexpected boolean definition found in file: %s line: %s' % (file, lineno+1))
bool_var = matches[0]
value = matches[1]
profile_data[profile][hat]['lvar'][bool_var] = value
elif RE_PROFILE_VARIABLE.search(line):
# variable additions += and =
matches = RE_PROFILE_VARIABLE.search(line)
list_var = strip_quotes(matches[0])
value = strip_quotes(matches[1])
if profile:
if not profile_data[profile][hat].get('lvar', False):
profile_data[profile][hat]['lvar'][list_var] = []
store_list_var(profile_data[profile]['lvar'], list_var, value)
else:
if not filelist[file].get('lvar', False):
filelist[file]['lvar'][list_var] = []
store_list_var(filelist[file]['lvar'], list_var, value)
elif RE_PROFILE_CONDITIONAL.search(line):
# Conditional Boolean
pass
elif RE_PROFILE_CONDITIONAL_VARIABLE.search(line):
# Conditional Variable defines
pass
elif RE_PROFILE_CONDITIONAL_BOOLEAN.search(line):
# Conditional Boolean defined
pass
elif RE_PROFILE_PATH_ENTRY.search(line):
matches = RE_PROFILE_PATH_ENTRY.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected path entry found in file: %s line: %s' % (file, lineno+1))
audit = False
if matches[0]:
audit = True
allow = 'allow'
if matches[1]:
allow = 'deny'
user = False
if matches[2]:
user = True
path = matches[3].strip()
mode = matches[4]
nt_name = matches[6]
if nt_name:
nt_name = nt_name.strip()
p_re = convert_regexp(path)
try:
re.compile(p_re)
except:
raise AppArmorException('Syntax Error: Invalid Regex %s in file: %s line: %s' % (path, file, lineno+1))
if not validate_profile_mode(mode, allow, nt_name):
raise AppArmorException('Invalid mode %s in file: %s line: %s' % (mode, file, lineno+1))
tmpmode = None
if user:
tmpmode = str_to_mode('%s::' % mode)
else:
tmpmode = str_to_mode(mode)
profile_data[profile][hat][allow]['path'][path]['mode'] = profile_data[profile][hat][allow]['path'][path].get('mode', 0) | tmpmode
if nt_name:
profile_data[profile][hat][allow]['path'][path]['to'] = nt_name
if audit:
profile_data[profile][hat][allow]['path'][path]['audit'] = profile_data[profile][hat][allow]['path'][path].get('audit') | tmpmode
else:
profile_data[profile][hat][allow]['path'][path]['audit'] = 0
elif re_match_include(line):
# Include files
include = re_match_include(line)
if profile:
profile_data[profile][hat]['include'][include] = True
else:
if not filelist.get(file):
filelist[file] = hasher()
filelist[file]['include'][include] = True
# If include is a directory
if os.path.isdir(profile_dir + '/' + include):
for path in os.listdir(profile_dir + '/' + include):
path = path.strip()
if is_skippable_file(path):
continue
if os.path.isfile(profile_dir + '/' + include + '/' + path):
file_name = include + '/' + path
load_include(file_name)
else:
load_include(include)
elif RE_PROFILE_NETWORK.search(line):
matches = RE_PROFILE_NETWORK.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected network entry found in file: %s line: %s' % (file, lineno+1))
audit = False
if matches[0]:
audit = True
allow = 'allow'
if matches[1]:
allow = 'deny'
network = matches[2]
if re.search('\s+(\S+)\s+(\S+)\s*,\s*(#.*)?$', network):
nmatch = re.search(network, '\s+(\S+)\s+(\S+)\s*,\s*(#.*)?$').groups()
fam, typ = nmatch[:2]
profile_data[profile][hat][allow]['netdomain']['rule'][fam][typ] = True
profile_data[profile][hat][allow]['netdomain']['audit'][fam][typ] = audit
elif re.search('\s+(\S+)\s*,\s*(#.*)?$', network):
fam = re.search('\s+(\S+)\s*,\s*(#.*)?$', network).groups()[0]
profile_data[profile][hat][allow]['netdomain']['rule'][fam] = True
profile_data[profile][hat][allow]['netdomain']['audit'][fam] = audit
else:
profile_data[profile][hat][allow]['netdomain']['rule']['all'] = True
profile_data[profile][hat][allow]['netdomain']['audit']['all'] = audit # True
elif RE_PROFILE_CHANGE_HAT.search(line):
matches = RE_PROFILE_CHANGE_HAT.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected change hat declaration found in file: %s line: %s' % (file, lineno+1))
hat = matches[0]
hat = strip_quotes(hat)
if not profile_data[profile][hat].get('declared', False):
profile_data[profile][hat]['declared'] = True
elif RE_PROFILE_HAT_DEF.search(line):
# An embedded hat syntax definition starts
matches = RE_PROFILE_HAT_DEF.search(line).groups()
if not profile:
raise AppArmorException('Syntax Error: Unexpected hat definition found in file: %s line: %s' % (file, lineno+1))
in_contained_hat = True
hat = matches[0]
hat = strip_quotes(hat)
flags = matches[3]
profile_data[profile][hat]['flags'] = flags
profile_data[profile][hat]['declared'] = False
#profile_data[profile][hat]['allow']['path'] = hasher()
#profile_data[profile][hat]['allow']['netdomain'] = hasher()
if initial_comment:
profile_data[profile][hat]['initial_comment'] = initial_comment
initial_comment = ''
filelist[file]['profiles'][profile][hat] = True
elif line[0] == '#':
# Handle initial comments
if not profile:
if line.startswith('# vim:syntax') or line.startswith('# Last Modified:'):
continue
line = line.split()
if line[1] == 'REPOSITORY:':
if len(line) == 3:
repo_data = {'neversubmit': True}
elif len(line) == 5:
repo_data = {'url': line[2],
'user': line[3],
'id': line[4]}
else:
initial_comment = line + '\n'
else:
raise AppArmorException('Syntax Error: Unknown line found in file: %s line: %s' % (file, lineno+1))
# Below is not required I'd say
if not do_include:
for hatglob in cfg['required_hats'].keys():
for parsed_prof in sorted(parsed_profiles):
if re.search(hatglob, parsed_prof):
for hat in cfg['required_hats'][hatglob].split():
if not profile_data[parsed_prof].get(hat, False):
profile_data[parsed_prof][hat] = hasher()
# End of file reached but we're stuck in a profile
if profile and not do_include:
raise AppArmorException('Syntax Error: Reached end of file %s while inside profile %s' % (file, profile))
return profile_data
def separate_vars(vs):
data = []
RE_VARS = re.compile('\s*((\".+?\")|([^\"]\S+))\s*(.*)$')
while RE_VARS.search(vs):
matches = RE_VARS.search(vs).groups()
data.append(strip_quotes(matches[0]))
vs = matches[3]
return data
def is_active_profile(pname):
if aa.get(pname, False):
return True
else:
return False
def store_list_var(var, list_var, value):
vlist = separate_vars(value)
if var.get(list_var):
vlist += var[list_var] #vlist = (vlist, var[list_var])
vlist = list(set(vlist))
var[list_var] = vlist
def strip_quotes(data):
if data[0]+data[-1] == '""':
return data[1:-1]
def quote_if_needed(data):
# quote data if it contains whitespace
if ' ' in data:
data = '"' + data + '"'
return data
def escape(escape):
escape = strip_quotes(escape)
escape = re.sub('((?<!\\))"', r'\1\\', escape)
if re.search('(\s|^$|")', escape):
return '"%s"' % escape
return escape
def write_header(profile_data, depth, name, embedded_hat, write_flags):
pre = ' ' * depth
data = []
name = quote_if_needed(name)
if (not embedded_hat and re.search('^[^\/]|^"[^\/]', name)) or (embedded_hat and re.search('^[^^]' ,name)):
name = 'profile %s' % name
if write_flags and profile_data['flags']:
data.append('%s%s flags(%s) {' % (pre, name, profile_data['flags']))
else:
data.append('%s%s {' % (pre, name))
return data
def write_single(profile_data, depth, allow, name, prefix, tail):
pre = ' ' * depth
data = []
ref, allow = set_ref_allow(profile_data, allow)
if ref.get(name, False):
for key in sorted(re[name].keys()):
qkey = quote_if_needed(key)
data.append(pre + allow + prefix + qkey + tail)
if ref[name].keys():
data.append('')
return data
def set_ref_allow(profile_data, allow):
if allow:
if allow == 'deny':
return profile_data[allow], 'deny '
else:
return profile_data[allow], ''
else:
return profile_data, ''
def write_pair(profile_data, depth, allow, name, prefix, sep, tail, fn):
pre = ' ' * depth
data = []
ref, allow = set_ref_allow(profile_data, allow)
if ref.get(name, False):
for key in sorted(re[name].keys()):
value = fn(ref[name][key])#eval('%s(%s)' % (fn, ref[name][key]))
data.append(pre + allow + prefix + key + sep + value)
if ref[name].keys():
data.append('')
return data
def write_includes(prof_data, depth):
return write_single(prof_data, depth, '', 'include', '#include <', '>')
def write_change_profile(prof_data, depth):
return write_single(prof_data, depth, '', 'change_profile', 'change_profile -> ', ',')
def write_alias(prof_data, depth):
return write_pair(prof_data, depth, '', 'alias', 'alias ', ' -> ', ',', quote_if_needed)
def write_rlimits(prof_data, depth):
return write_pair(prof_data, depth, '', 'rlimit', 'set rlimit ', ' <= ', ',', quote_if_needed)
def var_transform(ref):
data = []
for value in ref:
data.append(quote_if_needed(value))
return ' '.join(data)
def write_list_vars(prof_data, depth):
return write_pair(prof_data, depth, '', 'lvar', '', ' = ', '', var_transform)
def write_cap_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = ''
if allow == 'deny':
allowstr = 'deny'
if prof_data[allow].get('capability', False):
for cap in sorted(prof_data[allow]['capability'].keys()):
audit = ''
if prof_data[allow]['capability'][cap].get('audit', False):
audit = 'audit'
if prof_data[allow]['capability'][cap].get('set', False):
data.append(pre + audit + allowstr + 'capability,')
data.append('')
return data
def write_capabilities(prof_data, depth):
data = write_single(prof_data, depth, '', 'set_capability', 'set capability ', ',')
data += write_cap_rules(prof_data, depth, 'deny')
data += write_cap_rules(prof_data, depth, 'allow')
return data

View File

@@ -4,7 +4,7 @@ import gettext
import locale
import logging
import os
from yasti import yastLog, SendDataToYast, GetDataFromYast
from apparmor.yasti import yastLog, SendDataToYast, GetDataFromYast
from apparmor.common import readkey
@@ -226,7 +226,7 @@ CMDS = {
'CMD_OVERWRITE': '(O)verwrite Profile',
'CMD_KEEP': '(K)eep Profile',
'CMD_CONTINUE': '(C)ontinue',
'CMD_IGNORE_ENTRY': '(I)gnore Entry'
'CMD_IGNORE_ENTRY': '(I)gnore'
}
def UI_PromptUser(q):

View File

@@ -1,5 +1,5 @@
import re
import ycp
#import ycp
import os
import sys
import logging