From fde015435af2978b3a356056d8b957fa0eaa2d49 Mon Sep 17 00:00:00 2001 From: Eric Chiang Date: Wed, 28 Nov 2018 11:04:49 -0800 Subject: [PATCH] utils: add support to tools for profiles with xattrs Signed-off-by: Eric Chiang --- utils/apparmor/aa.py | 13 +++- utils/apparmor/profile_storage.py | 1 + utils/apparmor/regex.py | 8 ++- utils/test/test-aa.py | 101 +++++++++++++++++++++++++++--- 4 files changed, 110 insertions(+), 13 deletions(-) diff --git a/utils/apparmor/aa.py b/utils/apparmor/aa.py index fd44c8092..ce5eff27a 100644 --- a/utils/apparmor/aa.py +++ b/utils/apparmor/aa.py @@ -683,6 +683,7 @@ def change_profile_flags(prof_filename, program, flag, set_flag): 'flags': newflags, 'profile_keyword': matches['profile_keyword'], 'header_comment': matches['comment'] or '', + 'xattrs': matches['xattrs'], } line = write_header(header_data, len(space)/2, profile, False, True) line = '%s\n' % line[0] @@ -2175,8 +2176,9 @@ def parse_profile_start(line, file, lineno, profile, hat): attachment = matches['attachment'] flags = matches['flags'] + xattrs = matches['xattrs'] - return (profile, hat, attachment, flags, in_contained_hat, pps_set_profile, pps_set_hat_external) + return (profile, hat, attachment, xattrs, flags, in_contained_hat, pps_set_profile, pps_set_hat_external) def parse_profile_data(data, file, do_include): profile_data = hasher() @@ -2204,7 +2206,7 @@ def parse_profile_data(data, file, do_include): lastline = None # Starting line of a profile if RE_PROFILE_START.search(line): - (profile, hat, attachment, flags, in_contained_hat, pps_set_profile, pps_set_hat_external) = parse_profile_start(line, file, lineno, profile, hat) + (profile, hat, attachment, xattrs, flags, in_contained_hat, pps_set_profile, pps_set_hat_external) = parse_profile_start(line, file, lineno, profile, hat) if profile_data[profile].get(hat, False): raise AppArmorException('Profile %(profile)s defined twice in %(file)s, last found in line %(line)s' % @@ -2224,6 +2226,7 @@ def parse_profile_data(data, file, do_include): profile_data[profile][hat]['filename'] = file filelist[file]['profiles'][profile][hat] = True + profile_data[profile][hat]['xattrs'] = xattrs profile_data[profile][hat]['flags'] = flags # Save the initial comment @@ -2634,11 +2637,15 @@ def write_header(prof_data, depth, name, embedded_hat, write_flags): if (not embedded_hat and re.search('^[^/]', unquoted_name)) or (embedded_hat and re.search('^[^^]', unquoted_name)) or prof_data['attachment'] or prof_data['profile_keyword']: name = 'profile %s%s' % (name, attachment) + xattrs = '' + if prof_data['xattrs']: + xattrs = ' xattrs=(%s)' % prof_data['xattrs'] + flags = '' if write_flags and prof_data['flags']: flags = ' flags=(%s)' % prof_data['flags'] - data.append('%s%s%s {%s' % (pre, name, flags, comment)) + data.append('%s%s%s%s {%s' % (pre, name, xattrs, flags, comment)) return data diff --git a/utils/apparmor/profile_storage.py b/utils/apparmor/profile_storage.py index 6ef4ca951..e715e7265 100644 --- a/utils/apparmor/profile_storage.py +++ b/utils/apparmor/profile_storage.py @@ -69,6 +69,7 @@ class ProfileStorage: data['filename'] = '' data['name'] = '' data['attachment'] = '' + data['xattrs'] = '' data['flags'] = '' data['external'] = False data['header_comment'] = '' # currently only set by change_profile_flags() diff --git a/utils/apparmor/regex.py b/utils/apparmor/regex.py index e3cdde365..6df66232d 100644 --- a/utils/apparmor/regex.py +++ b/utils/apparmor/regex.py @@ -30,6 +30,7 @@ RE_PATH = '/\S*|"/[^"]*"' # filename (starting with '/') withou RE_PROFILE_PATH = '(?P<%s>(' + RE_PATH + '))' # quoted or unquoted filename. %s is the match group name RE_PROFILE_PATH_OR_VAR = '(?P<%s>(' + RE_PATH + '|@{\S+}\S*|"@{\S+}[^"]*"))' # quoted or unquoted filename or variable. %s is the match group name RE_SAFE_OR_UNSAFE = '(?P(safe|unsafe))' +RE_XATTRS = '(\s+xattrs\s*=\s*\((?P([^)=]+=[^)=]+\s?)+)\)\s*)?' RE_PROFILE_END = re.compile('^\s*\}' + RE_EOL) RE_PROFILE_CAP = re.compile(RE_AUDIT_DENY + 'capability(?P(\s+\S+)+)?' + RE_COMMA_EOL) @@ -43,7 +44,7 @@ RE_PROFILE_CONDITIONAL_VARIABLE = re.compile('^\s*if\s+(not\s+)?defined\s+(@\{?\ RE_PROFILE_CONDITIONAL_BOOLEAN = re.compile('^\s*if\s+(not\s+)?defined\s+(\$\{?\w+\}?)\s*\{\s*(#.*)?$') RE_PROFILE_NETWORK = re.compile(RE_AUDIT_DENY + 'network(?P
\s+.*)?' + RE_COMMA_EOL) RE_PROFILE_CHANGE_HAT = re.compile('^\s*\^(\"??.+?\"??)' + RE_COMMA_EOL) -RE_PROFILE_HAT_DEF = re.compile('^(?P\s*)(?P\^|hat\s+)(?P\"??.+?\"??)\s+((flags=)?\((?P.+)\)\s+)*\{' + RE_EOL) +RE_PROFILE_HAT_DEF = re.compile('^(?P\s*)(?P\^|hat\s+)(?P\"??[^)]+?\"??)'+RE_XATTRS+'\s+((flags=)?\((?P[^)]+)\)\s+)*\{' + RE_EOL) RE_PROFILE_DBUS = re.compile(RE_AUDIT_DENY + '(dbus\s*,|dbus(?P
\s+[^#]*)\s*,)' + RE_EOL) RE_PROFILE_MOUNT = re.compile(RE_AUDIT_DENY + '((mount|remount|umount|unmount)(\s+[^#]*)?\s*,)' + RE_EOL) RE_PROFILE_SIGNAL = re.compile(RE_AUDIT_DENY + '(signal\s*,|signal(?P
\s+[^#]*)\s*,)' + RE_EOL) @@ -68,7 +69,8 @@ RE_PROFILE_START = re.compile( '|' + # or '(' + 'profile' + '\s+' + RE_PROFILE_NAME % 'namedprofile' + '(\s+' + RE_PROFILE_PATH_OR_VAR % 'attachment' + ')?' + ')' + # 'profile', profile name, optionally attachment ')' + - '\s+((flags\s*=\s*)?\((?P.+)\)\s*)?\{' + + RE_XATTRS + + '\s+((flags\s*=\s*)?\((?P[^)]+)\)\s*)?\{' + RE_EOL) @@ -110,7 +112,7 @@ def parse_profile_start_line(line, filename): result = {} - for section in [ 'leadingspace', 'plainprofile', 'namedprofile', 'attachment', 'flags', 'comment']: + for section in [ 'leadingspace', 'plainprofile', 'namedprofile', 'attachment', 'xattrs', 'flags', 'comment']: if matches.group(section): result[section] = matches.group(section) diff --git a/utils/test/test-aa.py b/utils/test/test-aa.py index 2bccae256..1efa01960 100644 --- a/utils/test/test-aa.py +++ b/utils/test/test-aa.py @@ -511,32 +511,42 @@ class AaTest_parse_profile_start(AATest): def test_parse_profile_start_01(self): result = self._parse('/foo {', None, None) - expected = ('/foo', '/foo', None, None, False, False, False) + expected = ('/foo', '/foo', None, None, None, False, False, False) self.assertEqual(result, expected) def test_parse_profile_start_02(self): result = self._parse('/foo (complain) {', None, None) - expected = ('/foo', '/foo', None, 'complain', False, False, False) + expected = ('/foo', '/foo', None, None, 'complain', False, False, False) self.assertEqual(result, expected) def test_parse_profile_start_03(self): result = self._parse('profile foo /foo {', None, None) # named profile - expected = ('foo', 'foo', '/foo', None, False, False, False) + expected = ('foo', 'foo', '/foo', None, None, False, False, False) self.assertEqual(result, expected) def test_parse_profile_start_04(self): result = self._parse('profile /foo {', '/bar', '/bar') # child profile - expected = ('/bar', '/foo', None, None, True, True, False) + expected = ('/bar', '/foo', None, None, None, True, True, False) self.assertEqual(result, expected) def test_parse_profile_start_05(self): result = self._parse('/foo//bar {', None, None) # external hat - expected = ('/foo', 'bar', None, None, False, False, True) + expected = ('/foo', 'bar', None, None, None, False, False, True) self.assertEqual(result, expected) def test_parse_profile_start_06(self): result = self._parse('profile "/foo" (complain) {', None, None) - expected = ('/foo', '/foo', None, 'complain', False, False, False) + expected = ('/foo', '/foo', None, None, 'complain', False, False, False) + self.assertEqual(result, expected) + + def test_parse_profile_start_07(self): + result = self._parse('profile "/foo" xattrs=(user.bar=bar) {', None, None) + expected = ('/foo', '/foo', None, 'user.bar=bar', None, False, False, False) + self.assertEqual(result, expected) + + def test_parse_profile_start_08(self): + result = self._parse('profile "/foo" xattrs=(user.bar=bar user.foo=*) {', None, None) + expected = ('/foo', '/foo', None, 'user.bar=bar user.foo=*', None, False, False, False) self.assertEqual(result, expected) def test_parse_profile_start_unsupported_01(self): @@ -566,6 +576,44 @@ class AaTest_parse_profile_data(AATest): # file contains two profiles with the same name parse_profile_data('profile /foo {\n}\nprofile /foo {\n}\n'.split(), 'somefile', False) + def test_parse_xattrs_01(self): + prof = parse_profile_data('/foo xattrs=(user.bar=bar) {\n}\n'.split(), 'somefile', False) + + self.assertEqual(list(prof.keys()), ['/foo']) + self.assertEqual(list(prof['/foo'].keys()), ['/foo']) + self.assertEqual(prof['/foo']['/foo']['name'], '/foo') + self.assertEqual(prof['/foo']['/foo']['filename'], 'somefile') + self.assertEqual(prof['/foo']['/foo']['flags'], None) + self.assertEqual(prof['/foo']['/foo']['xattrs'], 'user.bar=bar') + + def test_parse_xattrs_02(self): + prof = parse_profile_data('/foo xattrs=(user.bar=bar user.foo=*) {\n}\n'.split(), 'somefile', False) + + self.assertEqual(list(prof.keys()), ['/foo']) + self.assertEqual(list(prof['/foo'].keys()), ['/foo']) + self.assertEqual(prof['/foo']['/foo']['name'], '/foo') + self.assertEqual(prof['/foo']['/foo']['filename'], 'somefile') + self.assertEqual(prof['/foo']['/foo']['flags'], None) + self.assertEqual(prof['/foo']['/foo']['xattrs'], 'user.bar=bar user.foo=*') + + def test_parse_xattrs_03(self): + d = '/foo xattrs=(user.bar=bar) flags=(complain) {\n}\n' + prof = parse_profile_data(d.split(), 'somefile', False) + + self.assertEqual(list(prof.keys()), ['/foo']) + self.assertEqual(list(prof['/foo'].keys()), ['/foo']) + self.assertEqual(prof['/foo']['/foo']['name'], '/foo') + self.assertEqual(prof['/foo']['/foo']['filename'], 'somefile') + self.assertEqual(prof['/foo']['/foo']['flags'], 'complain') + self.assertEqual(prof['/foo']['/foo']['xattrs'], 'user.bar=bar') + + def test_parse_xattrs_04(self): + with self.assertRaises(AppArmorException): + # flags before xattrs + d = '/foo flags=(complain) xattrs=(user.bar=bar) {\n}\n' + parse_profile_data(d.split(), 'somefile', False) + + class AaTest_separate_vars(AATest): tests = [ ('' , set() ), @@ -669,11 +717,50 @@ class AaTest_write_header(AATest): embedded_hat = params[1] write_flags = params[2] depth = params[3] - prof_data = { 'flags': params[4], 'attachment': params[5], 'profile_keyword': params[6], 'header_comment': params[7] } + prof_data = { 'flags': params[4], 'attachment': params[5], 'profile_keyword': params[6], 'header_comment': params[7], 'xattrs': '' } result = write_header(prof_data, depth, name, embedded_hat, write_flags) self.assertEqual(result, [expected]) +class AaTest_write_header_01(AATest): + tests = [ + ( + {'name': '/foo', 'write_flags': True, 'depth': 1, 'flags': 'complain'}, + ' /foo flags=(complain) {', + ), + ( + {'name': '/foo', 'write_flags': True, 'depth': 1, 'flags': 'complain', 'profile_keyword': 'profile'}, + ' profile /foo flags=(complain) {', + ), + ( + {'name': '/foo', 'write_flags': True, 'flags': 'complain'}, + '/foo flags=(complain) {', + ), + ( + {'name': '/foo', 'xattrs': 'user.foo=bar', 'write_flags': True, 'flags': 'complain'}, + '/foo xattrs=(user.foo=bar) flags=(complain) {', + ), + ( + {'name': '/foo', 'xattrs': 'user.foo=bar', 'embedded_hat': True}, + 'profile /foo xattrs=(user.foo=bar) {', + ), + ] + + def _run_test(self, params, expected): + name = params['name'] + embedded_hat = params.get('embedded_hat', False) + write_flags = params.get('write_flags', False) + depth = params.get('depth', 0) + prof_data = { + 'xattrs': params.get('xattrs', None), + 'flags': params.get('flags', None), + 'attachment': params.get('attachment', None), + 'profile_keyword': params.get('profile_keyword', None), + 'header_comment': params.get('header_comment', None), + } + result = write_header(prof_data, depth, name, embedded_hat, write_flags) + self.assertEqual(result, [expected]) + class AaTest_get_file_perms_1(AATest): tests = [ ('/usr/share/common-licenses/foo/bar', {'allow': {'all': set(), 'owner': {'w'} }, 'deny': {'all':set(), 'owner': set()}, 'paths': {'/usr/share/common-licenses/**'} }),