diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 134bdd23c..13aebbe74 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,3 +1,9 @@ +spec: + inputs: + build-test-images: + default: false + type: boolean + description: Explicitly build virtual machine images used by integration tests. --- image: ubuntu:latest @@ -269,8 +275,7 @@ image-ubuntu-cloud-24.04-x86_64: - .image-garden.mk - .gitlab-ci.yml compare_to: "refs/heads/master" - - if: $CI_COMMIT_BRANCH - when: manual + - if: $CI_COMMIT_BRANCH && "$[[ inputs.build-test-images ]]" == "true" .spread-x86_64: extends: .image-garden-x86_64 diff --git a/libraries/libapparmor/swig/python/test/Makefile.am b/libraries/libapparmor/swig/python/test/Makefile.am index 761735a9a..7a7e0c4f4 100644 --- a/libraries/libapparmor/swig/python/test/Makefile.am +++ b/libraries/libapparmor/swig/python/test/Makefile.am @@ -15,6 +15,7 @@ PYTHON_DIST_BUILD_PATH = '$(builddir)/../build/$$($(PYTHON) buildpath.py)' TESTS = test_python.py TESTS_ENVIRONMENT = \ LD_LIBRARY_PATH='$(top_builddir)/src/.libs:$(PYTHON_DIST_BUILD_PATH)' \ - PYTHONPATH='$(PYTHON_DIST_BUILD_PATH)' + PYTHONPATH='$(PYTHON_DIST_BUILD_PATH)' \ + PYTHONDONTWRITEBYTECODE='1' endif diff --git a/profiles/apparmor.d/dig b/profiles/apparmor.d/dig new file mode 100644 index 000000000..baeb7e05e --- /dev/null +++ b/profiles/apparmor.d/dig @@ -0,0 +1,46 @@ +#------------------------------------------------------------------ +# Copyright (C) 2025 Canonical Ltd. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of version 2 of the GNU General Public +# License published by the Free Software Foundation. +#------------------------------------------------------------------ +# vim: ft=apparmor +# + +abi , + +include + +profile dig /usr/bin/dig { + include + include + include + include + + /usr/bin/dig mr, + + network inet dgram, + network inet6 dgram, + network inet stream, + network inet6 stream, + + capability dac_override, + capability dac_read_search, + + # +trace + network (create,bind,getattr,send,receive) netlink raw, + + file r /proc/version_signature, + + # -f, -k, +tls-ca, +tls-certfile, +tls-keyfile + file r @{HOME}/[^.]**, + owner rw @{HOME}/.dig/**, + + ## denied by private-files-strict + priority=1 owner r @{HOME}/.digrc, + + # Site-specific additions and overrides. See local/README for details. + include if exists +} + diff --git a/profiles/apparmor.d/fusermount3 b/profiles/apparmor.d/fusermount3 index 39c99eced..7e34ac8f8 100644 --- a/profiles/apparmor.d/fusermount3 +++ b/profiles/apparmor.d/fusermount3 @@ -11,11 +11,12 @@ profile fusermount3 /usr/bin/fusermount3 { # Allow both rw and ro type mounts (e.g. AppImage uses ro) #MS_DIRSYNC, MS_NOATIME, MS_NODIRATIME, MS_NOEXEC, MS_SYNCHRONOUS, MS_NOSYMFOLLOW - mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,sync) -> @{HOME}/**/, - mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,sync) -> /mnt/{,**/}, - mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,sync) -> @{run}/user/@{uid}/**/, - mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,sync) -> /media/**/, - mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,sync) -> /tmp/**/, + # Below broad mount flags should be revisited once we have rule delegation + mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,noexec,sync) -> @{HOME}/**/, + mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,noexec,sync) -> /mnt/{,**/}, + mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,noexec,sync) -> @{run}/user/@{uid}/**/, + mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,noexec,sync) -> /media/**/, + mount fstype=@{fuse_types} options=(nosuid,nodev) options in (ro,rw,noatime,dirsync,nodiratime,noexec,sync) -> /tmp/**/, # Cern VM fs is special and only uses these exact flags mount fstype=@{fuse_types} options=(nosuid,nodev,ro) -> /cvmfs/**/, diff --git a/profiles/apparmor.d/usr.bin.hwctl b/profiles/apparmor.d/usr.bin.hwctl index 6fae491bd..420d3e311 100644 --- a/profiles/apparmor.d/usr.bin.hwctl +++ b/profiles/apparmor.d/usr.bin.hwctl @@ -17,6 +17,7 @@ profile hwctl /usr/bin/hwctl { include include include + include network inet dgram, network inet6 dgram, @@ -28,21 +29,14 @@ profile hwctl /usr/bin/hwctl { /sys/firmware/dmi/tables/* r, # for collecting SMBIOS info /sys/devices/system/cpu/cpufreq/policy*/cpuinfo_max_freq r, - /sys/fs/cgroup/**/cpu.max r, + /sys/fs/cgroup/{,**/}cpu.max r, @{PROC}/version r, @{PROC}/@{pid}/cgroup r, # for collecting OS information - /usr/bin/{dpkg,kmod} cx, - /usr/bin/lsb_release Px -> lsb_release, - - profile dpkg /usr/bin/dpkg { - include - - @{exec_path} r, - /etc/dpkg/** r, - } + /usr/bin/kmod cx, + /etc/os-release r, profile kmod /usr/bin/kmod { include diff --git a/tests/regression/apparmor/mount.sh b/tests/regression/apparmor/mount.sh index a9d2ff72f..361889d0d 100755 --- a/tests/regression/apparmor/mount.sh +++ b/tests/regression/apparmor/mount.sh @@ -212,6 +212,18 @@ test_nonfs_options_equals_in() { remove_mnt } +test_nonfs_options_equals_in_ext() { + # args: option=$1, option in $2, $3=pass/fail, mount -o $4 + if [ "$(parser_supports "mount options=($1) options in ($2),")" != "true" ] ; then + echo " not supported by parser - skipping mount options=($1) options in ($2)," + return + fi + + genprofile cap:sys_admin "mount:options=($1) options in ($2)" + runchecktest "MOUNT (confined cap mount option=$1 option in $2 ($4))" $3 mount ${loop_device} ${mount_point} -o $4 + remove_mnt +} + test_dir_options() { if [ "$(parser_supports "mount options=($1),")" != "true" ] ; then echo " not supported by parser - skipping mount option=($1)," @@ -288,6 +300,9 @@ test_options() { # TODO: expand this to cover more mount flag combinations test_nonfs_options_equals_in 'nosuid,nodev' 'noatime,noexec' + test_nonfs_options_equals_in_ext 'nosuid,nodev' 'noatime' 'fail' 'nosuid,nodev,noexec' + test_nonfs_options_equals_in_ext 'nosuid,nodev' 'noatime' 'fail' 'nosuid,nodev,noatime,noexec' + for i in "bind" "rbind" "move"; do test_dir_options $i done diff --git a/utils/Makefile b/utils/Makefile index f406e08e3..1e3c36b81 100644 --- a/utils/Makefile +++ b/utils/Makefile @@ -22,7 +22,7 @@ include $(COMMONDIR)/Make.rules PYTOOLS = aa-easyprof aa-genprof aa-logprof aa-cleanprof aa-mergeprof \ aa-autodep aa-audit aa-complain aa-enforce aa-disable \ - aa-notify aa-unconfined + aa-notify aa-unconfined aa-show-usage TOOLS = ${PYTOOLS} aa-decode aa-remove-unknown PYSETUP = python-tools-setup.py PYMODULES = $(wildcard apparmor/*.py apparmor/rule/*.py) diff --git a/utils/aa-notify b/utils/aa-notify index 119a97721..d8f36050f 100755 --- a/utils/aa-notify +++ b/utils/aa-notify @@ -850,7 +850,7 @@ def main(): filter_group.add_argument('--filter.family', metavar='FAMILY', help=_('regular expression to match the network family')) filter_group.add_argument('--filter.socket', metavar='SOCKET', help=_('regular expression to match the network socket type')) - # If a TTY then assume running in test mode and fix output width + # If not a TTY then assume running in test mode and fix output width if not sys.stdout.isatty(): parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80) diff --git a/utils/aa-show-usage b/utils/aa-show-usage new file mode 100755 index 000000000..8950576c1 --- /dev/null +++ b/utils/aa-show-usage @@ -0,0 +1,155 @@ +#! /usr/bin/python3 +# ---------------------------------------------------------------------- +# Copyright (C) 2025 Maxime Bélair +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of version 2 of the GNU General Public +# License as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# ---------------------------------------------------------------------- +# + +import argparse +import glob +import re +import json +import sys +import os + +from apparmor import aa +from apparmor.translations import init_translation +from apparmor.regex import expand_braces, resolve_variables + +_ = init_translation() + +MAX_RECURSION = 10 + + +def has_matching_file(pattern, xattrs=None): + for p in expand_braces(pattern): + for path in glob.iglob(p, recursive=True): + if os.path.realpath(path) != os.path.abspath(path): # remove symlinks + continue + if not xattrs or all(os.getxattr(path, name) == val for name, val in xattrs.items()): + return path + return None + + +def display_profile_text(used, unused, show_matching_path): + if used: + print(_('Used profiles:')) + for (name, attach, path, match) in used: + print(_(' Profile {} for {} ({}) {}').format(name, attach, path, ('→ ' + match) if show_matching_path else '')) + if unused: + print(_('Unused profiles:')) + for (name, attach, path, match) in unused: + print(_(' Profile {} for {} ({}) ').format(name, attach, path)) + + +def profiles_to_json(profiles): + result = [] + for profile_name, attach, path, matching_path in profiles: + entry = {'name': profile_name, 'attach': attach, 'path': path} + if matching_path: + entry['matching_path'] = matching_path + result.append(entry) + return result + + +def display_profile_json(used, unused): + profiles = {} + profiles['version'] = 1 # JSON format version - increase if you change the json structure + if used: + profiles['used'] = profiles_to_json(used) + if unused: + profiles['unused'] = profiles_to_json(unused) + + print(json.dumps(profiles, indent=2)) + + +def filter_profile(path, profile_name, attach, prof_filter): + if prof_filter['flags'] and not prof_filter['flags'].match(aa.active_profiles.profiles[profile_name].data['flags'] or ''): + return False + if prof_filter['name'] and not prof_filter['name'].match(profile_name or ''): + return False + if prof_filter['attach'] and not prof_filter['attach'].match(attach or ''): + return False + if prof_filter['path'] and not prof_filter['path'].match(path or ''): + return False + + return True + + +def get_used_profiles(args, prof_filter): + aa.init_aa(confdir=args.configdir or os.getenv('__AA_CONFDIR'), profiledir=args.dir) + aa.read_profiles() + used = [] + unused = [] + + for a, v in aa.active_profiles.attachments.items(): + filename = v['f'] + profile_name = v['p'] + if not filter_profile(filename, profile_name, a, prof_filter): + continue + + var_dict = aa.active_profiles.get_all_merged_variables(filename, aa.include_list_recursive(aa.active_profiles.files[filename], True)) + resolved = resolve_variables(a, var_dict) + matching_path = None + for entry in resolved: + matching_path = has_matching_file(entry) + if matching_path: + break + + if matching_path and args.show_type != 'unused': + used.append((profile_name, a, filename, matching_path)) + if not matching_path and args.show_type != 'used': + unused.append((profile_name, a, filename, matching_path)) + + return used, unused + + +def main(): + parser = argparse.ArgumentParser(description=_('Check which profiles are used')) + parser.add_argument('-s', '--show-type', type=str, default='all', choices=['all', 'used', 'unused'], help=_('Type of profiles to show')) + parser.add_argument('-j', '--json', action='store_true', help=_('Output in JSON')) + parser.add_argument('-d', '--dir', type=str, help=_('Path to profiles')) + parser.add_argument('--show-matching-path', action='store_true', help=_('Show the path of a file matching the profile')) + parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS) + + filter_group = parser.add_argument_group(_('Filtering options'), + description=(_('Filters are used to reduce the output of information to only ' + 'those entries that will match the filter. Filters use Python\'s regular ' + 'expression syntax.'))) + filter_group.add_argument('--filter.flags', dest='filter_flags', metavar='FLAGS', help=_('Filter by flags')) + filter_group.add_argument('--filter.profile_name', dest='filter_name', metavar='PROFILE_NAME', help=_('Filter by profile name')) + filter_group.add_argument('--filter.profile_attach', dest='filter_attach', metavar='PROFILE_ATTACH', help=_('Filter by profile attachment')) + filter_group.add_argument('--filter.profile_path', dest='filter_path', metavar='PROFILE_PATH', help=_('Filter by profile path')) + + # If not a TTY then assume running in test mode and fix output width + if not sys.stdout.isatty(): + parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80) + + args = parser.parse_args() + + prof_filter = { + 'flags': re.compile(args.filter_flags) if args.filter_flags else None, + 'name': re.compile(args.filter_name) if args.filter_name else None, + 'attach': re.compile(args.filter_attach) if args.filter_attach else None, + 'path': re.compile(args.filter_path) if args.filter_path else None, + } + + used, unused = get_used_profiles(args, prof_filter) + + if args.json: + display_profile_json(used, unused) + else: + display_profile_text(used, unused, args.show_matching_path) + + +if __name__ == '__main__': + main() diff --git a/utils/aa-show-usage.pod b/utils/aa-show-usage.pod new file mode 100644 index 000000000..9eab60d5d --- /dev/null +++ b/utils/aa-show-usage.pod @@ -0,0 +1,106 @@ +# This publication is intellectual property of Canonical Ltd. Its contents +# can be duplicated, either in part or in whole, provided that a copyright +# label is visibly located on each copy. +# +# All information found in this book has been compiled with utmost +# attention to detail. However, this does not guarantee complete accuracy. +# Neither Canonical Ltd, the authors, nor the translators shall be held +# liable for possible errors or the consequences thereof. +# +# Many of the software and hardware descriptions cited in this book +# are registered trademarks. All trade names are subject to copyright +# restrictions and may be registered trade marks. Canonical Ltd +# essentially adheres to the manufacturer's spelling. +# +# Names of products and trademarks appearing in this book (with or without +# specific notation) are likewise subject to trademark and trade protection +# laws and may thus fall under copyright restrictions. +# + + +=pod + +=head1 NAME + +aa-show-usage - Check which profiles are used + +=head1 SYNOPSIS + +B [option] + +=head1 DESCRIPTION + +B will display data on the usage of the profiles of the system + +=head1 OPTIONS + +B accepts the following arguments: + +=over 4 + +=item -s, --show-type {all,used,unused} + +Type of profiles to show + +=item -j, --json + +Output in JSON + +=item -d, --dir + +Path to profiles + +=item --show-matching-path + +Show the path of a file matching the profile. Only the first matching path of an executable is shown (not the whole list). + +=back + +=head1 FILTERING OPTIONS + +B accepts the following filters to reduce the output of information to only those entries that will match the filter. Filters use Python's regular expression syntax. + +=over 4 + +=item --filter.flags FLAGS + +Filter by flags + +=item --filter.profile_name PROFILE_NAME + +Filter by profile name + +=item --filter.profile_attach PROFILE_ATTACH + +Filter by profile attachement (i.e. by path of the executable to which this profile applies) + +=item --filter.profile_path PROFILE_PATH + +Filter by profile path + +=back + +=head1 EXAMPLES + +Show both the list of used and unused profiles in your system. + + $ aa-show-usage + +Show the list of unconfined profiles currently used by your system. + + $ aa-show-usage --show-type=used --filter.flags=unconfined + +=head1 BUGS + +B needs to be able to read profiles to tell whether they are used in practice. + +B will only report directly used profiles. Profiles used via profile transitions, systemd's B, or API call such as B are not shown as used. + + +If you find any additional bugs, please report them at L. + +=head1 SEE ALSO + +apparmor(7) + +=cut diff --git a/utils/apparmor/aa.py b/utils/apparmor/aa.py index f273bd005..94e66efc0 100644 --- a/utils/apparmor/aa.py +++ b/utils/apparmor/aa.py @@ -37,7 +37,7 @@ from apparmor.profile_storage import ProfileStorage, add_or_remove_flag, ruletyp from apparmor.regex import ( RE_HAS_COMMENT_SPLIT, RE_PROFILE_CHANGE_HAT, RE_PROFILE_CONDITIONAL, RE_PROFILE_CONDITIONAL_BOOLEAN, RE_PROFILE_CONDITIONAL_VARIABLE, RE_PROFILE_END, - RE_PROFILE_HAT_DEF, RE_PROFILE_START, + RE_PROFILE_HAT_DEF, RE_PROFILE_START, RE_METADATA_LOGPROF_SUGGEST, RE_RULE_HAS_COMMA, parse_profile_start_line, re_match_include) from apparmor.rule.abi import AbiRule from apparmor.rule.file import FileRule @@ -1420,8 +1420,20 @@ def match_includes(profile, rule_type, rule_obj): # XXX type check should go away once we init all profiles correctly if valid_include(incname) and include[incname][incname][rule_type].is_covered(rule_obj): - if include[incname][incname]['logprof_suggest'] != 'no': + sug = include[incname][incname]['logprof_suggest'].split() + if sug == []: newincludes.append(rel_incname) + elif sug[0] == 'no': + continue + else: + for s in sug: + try: + if re.match(s, profile.data['name']): + newincludes.append(rel_incname) + break + except re.error as err: + aaui.UI_Important(_('WARNING: Invalid regex \'%s\' in abstraction %s: %s.' + % (s, rel_incname, err))) return newincludes @@ -1832,10 +1844,11 @@ def parse_profile_data(data, file, do_include, in_preamble): else: initial_comment = initial_comment + line + '\n' - if line.startswith('# LOGPROF-SUGGEST:'): # TODO: allow any number of spaces/tabs after '#' - parts = line.split() - if len(parts) > 2: - profile_data[profname]['logprof_suggest'] = parts[2] + if RE_METADATA_LOGPROF_SUGGEST.search(line): + # - logprof_suggest is a set of space-separated regexes + # - If this metadata is present, the abstraction is only proposed to logprof if at least one regex is matched + # - If this abstraction should not be proposed to any profile, it is possible to tell #LOGPROF-SUGGEST: no + profile_data[profname]['logprof_suggest'] = RE_METADATA_LOGPROF_SUGGEST.search(line).group('suggest') # keep line as part of initial_comment (if we ever support writing abstractions, we should update serialize_profile()) initial_comment = initial_comment + line + '\n' diff --git a/utils/apparmor/common.py b/utils/apparmor/common.py index da426acd8..7fadcf33f 100644 --- a/utils/apparmor/common.py +++ b/utils/apparmor/common.py @@ -15,6 +15,7 @@ import logging import os import re import subprocess +import signal import sys import termios import tty @@ -111,18 +112,35 @@ def recursive_print(src, dpth=0, key=''): print(tabs + '- %s' % src) -def cmd(command): - """Try to execute the given command.""" +def subprocess_setup(): + # Python installs a SIGPIPE handler by default. This is usually not what + # non-Python subprocesses expect. + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + + +def cmd_pipe_stderr(command, stdin=None): + return cmd(command, stderr=subprocess.PIPE, output_fmt='utf-8', stdin=stdin) + + +def cmd(command, stderr=subprocess.STDOUT, output_fmt='ascii', stdin=None): + """Execute the given command and return its output as a tuple (returncode, output).""" debug(command) try: - sp = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + sp = subprocess.Popen(command, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup) except OSError as ex: return [127, str(ex)] - out = sp.communicate()[0].decode('ascii', 'ignore') + out, err = sp.communicate() # TODO input??? - return [sp.returncode, out] + if err: + out = err + + if output_fmt == 'ascii': + decoded = out.decode(output_fmt, 'ignore') + else: + decoded = out.decode(output_fmt) + + return [sp.returncode, decoded] def cmd_pipe(command1, command2): diff --git a/utils/apparmor/easyprof.py b/utils/apparmor/easyprof.py index 1f87bfa58..1a4293085 100644 --- a/utils/apparmor/easyprof.py +++ b/utils/apparmor/easyprof.py @@ -14,12 +14,11 @@ import json import optparse import os import re -import subprocess import sys from shutil import which from tempfile import NamedTemporaryFile -from apparmor.common import AppArmorException, open_file_read +from apparmor.common import AppArmorException, open_file_read, cmd DEBUGGING = False @@ -55,19 +54,6 @@ def msg(out, output=sys.stdout): pass -def cmd(command): - """Try to execute the given command.""" - debug(command) - try: - sp = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - except OSError as ex: - return [127, str(ex)] - - out = sp.communicate()[0] - return [sp.returncode, out] - - def debug(out): """Print debug message""" if DEBUGGING: diff --git a/utils/apparmor/profile_storage.py b/utils/apparmor/profile_storage.py index 40306d150..26fd91ee8 100644 --- a/utils/apparmor/profile_storage.py +++ b/utils/apparmor/profile_storage.py @@ -13,9 +13,10 @@ # # ---------------------------------------------------------------------- +from types import NoneType from apparmor.common import AppArmorBug, AppArmorException -from apparmor.regex import parse_profile_start_line +from apparmor.regex import parse_profile_start_line, re_print_dict from apparmor.rule import quote_if_needed from apparmor.rule.abi import AbiRule, AbiRuleset from apparmor.rule.all import AllRule, AllRuleset @@ -79,7 +80,7 @@ class ProfileStorage: data['parent'] = '' # parent profile, or '' for top-level profiles and external hats data['name'] = '' data['attachment'] = '' - data['xattrs'] = '' + data['xattrs'] = {} data['flags'] = '' data['external'] = False data['header_comment'] = '' # comment in the profile/hat start line @@ -100,28 +101,16 @@ class ProfileStorage: if key not in self.data: raise AppArmorBug('attempt to set unknown key %s' % key) - # allow writing bool values - if isinstance(self.data[key], bool): - if isinstance(value, bool): + allowed_types = {bool, str, dict, None, NoneType} + old_type = type(self.data[key]) + if old_type in allowed_types: + if key in {'flags', 'filename'} and type(value) in {str, NoneType}: + self.data[key] = value + elif isinstance(value, old_type): self.data[key] = value else: - raise AppArmorBug('Attempt to change type of "%s" from %s to %s, value %s' % (key, type(self.data[key]), type(value), value)) - - # allow writing str or None to some keys - elif key in ('flags', 'filename'): - if isinstance(value, str) or value is None: - self.data[key] = value - else: - raise AppArmorBug('Attempt to change type of "%s" from %s to %s, value %s' % (key, type(self.data[key]), type(value), value)) - - # allow writing str values - elif isinstance(self.data[key], str): - if isinstance(value, str): - self.data[key] = value - else: - raise AppArmorBug('Attempt to change type of "%s" from %s to %s, value %s' % (key, type(self.data[key]), type(value), value)) - - # don't allow overwriting of other types + raise AppArmorBug('Attempt to change type of "%s" from %s to %s, value %s' % (key, old_type, type(value), value)) + self.data[key] = value else: raise AppArmorBug('Attempt to overwrite "%s" with %s, type %s' % (key, value, type(value))) @@ -168,7 +157,7 @@ class ProfileStorage: xattrs = '' if self.data['xattrs']: - xattrs = ' xattrs=(%s)' % self.data['xattrs'] + xattrs = ' xattrs=(%s)' % re_print_dict(self.data['xattrs']) flags = '' if self.data['flags']: @@ -263,7 +252,7 @@ class ProfileStorage: else: prof_storage['profile_keyword'] = matches['profile_keyword'] prof_storage['attachment'] = matches['attachment'] or '' - prof_storage['xattrs'] = matches['xattrs'] or '' + prof_storage['xattrs'] = matches['xattrs'] or {} return (profile, hat, prof_storage) diff --git a/utils/apparmor/regex.py b/utils/apparmor/regex.py index 535918b17..a7a93ea22 100644 --- a/utils/apparmor/regex.py +++ b/utils/apparmor/regex.py @@ -14,6 +14,7 @@ # ---------------------------------------------------------------------- import re +import itertools from apparmor.common import AppArmorBug, AppArmorException from apparmor.translations import init_translation @@ -27,12 +28,16 @@ RE_COMMA_EOL = r'\s*,' + RE_EOL # optional whitespace, comma + RE_EOL RE_PROFILE_NAME = r'(?P<%s>(\S+|"[^"]+"))' # string without spaces, or quoted string. %s is the match group name RE_PATH = r'/\S*|"/[^"]*"' # filename (starting with '/') without spaces, or quoted filename. +RE_VAR = r'@{[^}\s]+}' +RE_DICT_ENTRY = r'\s*(?P[^,\s=]+)(?:=(?P[^,\s=]+))?\s*' RE_PROFILE_PATH = '(?P<%s>(' + RE_PATH + '))' # quoted or unquoted filename. %s is the match group name -RE_PROFILE_PATH_OR_VAR = '(?P<%s>(' + RE_PATH + r'|@{\S+}\S*|"@{\S+}[^"]*"))' # quoted or unquoted filename or variable. %s is the match group name +RE_PROFILE_PATH_OR_VAR = '(?P<%s>(' + RE_PATH + '|' + RE_VAR + r'\S*|"' + RE_VAR + '[^"]*"))' # quoted or unquoted filename or variable. %s is the match group name RE_SAFE_OR_UNSAFE = '(?P(safe|unsafe))' -RE_XATTRS = r'(\s+xattrs\s*=\s*\((?P([^)=]+(=[^)=]+)?\s?)+)\)\s*)?' +RE_XATTRS = r'(\s+xattrs\s*=\s*\((?P([^)=]+(=[^)=]+)?\s?)*)\)\s*)?' RE_FLAGS = r'(\s+(flags\s*=\s*)?\((?P[^)]+)\))?' +RE_VARIABLE = re.compile(RE_VAR) + RE_PROFILE_END = re.compile(r'^\s*\}' + RE_EOL) RE_PROFILE_ALL = re.compile(RE_PRIORITY_AUDIT_DENY + r'all' + RE_COMMA_EOL) RE_PROFILE_CAP = re.compile(RE_PRIORITY_AUDIT_DENY + r'capability(?P(\s+\S+)+)?' + RE_COMMA_EOL) @@ -56,6 +61,8 @@ RE_PROFILE_USERNS = re.compile(RE_PRIORITY_AUDIT_DENY + r'(userns\s*,|userns(?P< RE_PROFILE_MQUEUE = re.compile(RE_PRIORITY_AUDIT_DENY + r'(mqueue\s*,|mqueue(?P
\s+[^#]*)\s*,)' + RE_EOL) RE_PROFILE_IO_URING = re.compile(RE_PRIORITY_AUDIT_DENY + r'(io_uring\s*,|io_uring(?P
\s+[^#]*)\s*,)' + RE_EOL) +RE_METADATA_LOGPROF_SUGGEST = re.compile(r'^\s*#\s*LOGPROF-SUGGEST\s*:\s*(?P.*)$') + # match anything that's not " or #, or matching quotes with anything except quotes inside __re_no_or_quoted_hash = '([^#"]|"[^"]*")*' @@ -162,6 +169,10 @@ def parse_profile_start_line(line, filename): else: result['profile'] = result['namedprofile'] result['profile_keyword'] = True + if 'xattrs' in result: + result['xattrs'] = re_parse_dict(result['xattrs']) + else: + result['xattrs'] = {} return result @@ -235,6 +246,30 @@ def re_match_include(line): return None +def re_parse_dict(raw): + """returns a dict where entries are comma or space separated""" + result = {} + if not raw: + return result + + for key, value in re.findall(RE_DICT_ENTRY, raw): + if value == '': + value = None + result[key] = value + + return result + + +def re_print_dict(d): + parts = [] + for k, v in sorted(d.items()): + if v: + parts.append("{}={}".format(k, v)) + else: + parts.append(k) + return " ".join(parts) + + def strip_parenthesis(data): """strips parenthesis from the given string and returns the strip()ped result. The parenthesis must be the first and last char, otherwise they won't be removed. @@ -251,3 +286,93 @@ def strip_quotes(data): return data[1:-1] else: return data + + +def expand_var(var, var_dict, seen_vars): + if var in seen_vars: + raise AppArmorException(_('Circular dependency detected for variable {}').format(var)) + + if var not in var_dict: + raise AppArmorException(_('Trying to reference non-existing variable {}').format(var)) + + resolved = [] + for val in var_dict[var]: + resolved.extend(expand_string(val, var_dict, seen_vars | {var})) + return resolved + + +def expand_string(s, var_dict, seen_vars): + + matches = list(RE_VARIABLE.finditer(s)) + if not matches: + return [s] + + parts = [] + last_idx = 0 + for match in matches: + start, end = match.span() + if start > last_idx: + parts.append([s[last_idx:start]]) + + var_name = match.group(0) + parts.append(expand_var(var_name, var_dict, seen_vars)) + last_idx = end + + if last_idx < len(s): + parts.append([s[last_idx:]]) + + return [''.join(p) for p in itertools.product(*parts)] + + +def resolve_variables(s, var_dict): + return expand_string(s, var_dict, set()) + + +# This function could be replaced by braceexpand.braceexpand +# It exists to avoid relying on an external python package. +def expand_braces(s): + i = s.find('{') + if i == -1: + if '}' in s: + raise AppArmorException('Unbalanced braces in pattern {}'.format(s)) + return [s] + + level = 0 + for j in range(i, len(s)): + if s[j] == '{': + level += 1 + elif s[j] == '}': + level -= 1 + if level == 0: + break + else: + raise AppArmorException('Unbalanced braces in pattern {}'.format(s)) + + prefix = s[:i] + group = s[i + 1:j] + suffix = s[j + 1:] + + # Split group on commas at the top level (i.e. not inside nested braces) + alts = [] + curr = '' + nested = 0 + for char in group: + if char == ',' and nested == 0: + alts.append(curr) + curr = "" + else: + if char == '{': + nested += 1 + elif char == '}': + nested -= 1 + curr += char + alts.append(curr) + + # Recursively combine prefix, each alternative, and suffix + results = [] + for alt in alts: + for expansion in expand_braces(prefix + alt + suffix): + results.append(expansion) + if len(results) <= 1: + raise AppArmorException('Braces should provide at least two alternatives, found {}: {}'.format(len(results), s)) + return results diff --git a/utils/test/cleanprof_test.out b/utils/test/cleanprof_test.out index bce58192e..aa501b1b1 100644 --- a/utils/test/cleanprof_test.out +++ b/utils/test/cleanprof_test.out @@ -78,7 +78,7 @@ $bar = true allow /home/foo/bar r, } -/what/ever/xattr xattrs=( foo=bar ) flags=( complain ) { +/what/ever/xattr xattrs=(foo=bar) flags=( complain ) { /what/ever r, } diff --git a/utils/test/test-aa-decode.py b/utils/test/test-aa-decode.py index 10589ed1f..d3583c5b8 100755 --- a/utils/test/test-aa-decode.py +++ b/utils/test/test-aa-decode.py @@ -10,62 +10,22 @@ # ------------------------------------------------------------------ import os -import signal -import subprocess import unittest from tempfile import NamedTemporaryFile - +from apparmor.common import cmd_pipe_stderr # The location of the aa-decode utility can be overridden by setting # the APPARMOR_DECODE environment variable; this is useful for running # these tests in an installed environment aadecode_bin = "../aa-decode" -# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html -# This is needed so that the subprocesses that produce endless output -# actually quit when the reader goes away. -def subprocess_setup(): - # Python installs a SIGPIPE handler by default. This is usually not what - # non-Python subprocesses expect. - signal.signal(signal.SIGPIPE, signal.SIG_DFL) - - -# Define only arguments that are actually ever used: command and stdin -def cmd(command, stdin=None): - """Try to execute given command (array) and return its stdout, or return - a textual error if it failed.""" - - try: - sp = subprocess.Popen( - command, - stdin=stdin, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - preexec_fn=subprocess_setup - ) - except OSError as e: - return 127, str(e) - - stdout, stderr = sp.communicate(input) - - # If there was some error output, show that instead of stdout to ensure - # test fails and does not mask potentially major warnings and errors. - if stderr: - out = stderr - else: - out = stdout - - return sp.returncode, out.decode('utf-8') - - class AADecodeTest(unittest.TestCase): def test_help(self): """Test --help argument""" expected = 0 - rc, report = cmd((aadecode_bin, "--help")) + rc, report = cmd_pipe_stderr((aadecode_bin, "--help")) result = 'Got exit code {}, expected {}\n'.format(rc, expected) self.assertEqual(expected, rc, result + report) @@ -80,7 +40,7 @@ class AADecodeTest(unittest.TestCase): temp_file.write(content) temp_file.flush() temp_file.seek(0) - rc, report = cmd((aadecode_bin,), stdin=temp_file) + rc, report = cmd_pipe_stderr(aadecode_bin, stdin=temp_file) result = 'Got exit code {}, expected {}\n'.format(rc, expected_return_code) self.assertEqual(expected_return_code, rc, result + report) @@ -95,7 +55,7 @@ class AADecodeTest(unittest.TestCase): expected_output = 'Decoded: /tmp/foo bar' test_code = '2F746D702F666F6F20626172' - rc, report = cmd((aadecode_bin, test_code)) + rc, report = cmd_pipe_stderr((aadecode_bin, test_code)) result = 'Got exit code {}, expected {}\n'.format(rc, expected) self.assertEqual(expected, rc, result + report) result = 'Got output "{}", expected "{}"\n'.format(report, expected_output) diff --git a/utils/test/test-aa-notify.py b/utils/test/test-aa-notify.py index d1e75e701..8f40e548f 100644 --- a/utils/test/test-aa-notify.py +++ b/utils/test/test-aa-notify.py @@ -13,7 +13,6 @@ import os import pwd import signal -import subprocess import sys import time import unittest @@ -21,6 +20,7 @@ from tempfile import NamedTemporaryFile from datetime import datetime import apparmor.aa as aa +from apparmor.common import cmd_pipe_stderr from common_test import AATest, setup_aa, setup_all_loops # The location of the aa-notify utility can be overridden by setting @@ -38,34 +38,6 @@ def subprocess_setup(): signal.signal(signal.SIGPIPE, signal.SIG_DFL) -def cmd(command): - """Try to execute given command (array) and return its stdout, or return - a textual error if it failed.""" - - try: - sp = subprocess.Popen( - command, - stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - preexec_fn=subprocess_setup - ) - except OSError as e: - return 127, str(e) - - stdout, stderr = sp.communicate(input) - - # If there was some error output, show that instead of stdout to ensure - # test fails and does not mask potentially major warnings and errors. - if stderr: - out = stderr - else: - out = stdout - - return sp.returncode, out.decode('utf-8') - - class AANotifyBase(AATest): def create_logfile_contents(_time): @@ -141,7 +113,7 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc if 'SUDO_USER' in os.environ: username = os.environ.get('SUDO_USER') - return_code, output = cmd(['last', username, '--fullnames', '--time-format', 'iso']) + return_code, output = cmd_pipe_stderr(['last', username, '--fullnames', '--time-format', 'iso']) output = output.split('\n')[0] # the first line is enough # example of output (util-linux last command): # ubuntu tty7 :0 2024-01-05T14:29:11-03:00 gone - no logout @@ -184,7 +156,7 @@ class AANotifyTest(AANotifyBase): expected_return_code = 0 expected_output_has = 'usage: aa-notify' - return_code, output = cmd(aanotify_bin) + return_code, output = cmd_pipe_stderr(aanotify_bin) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) @@ -262,7 +234,7 @@ Filtering options: for patch in patches: expected_output_2 = expected_output_2.replace(patch[0], patch[1]) - return_code, output = cmd(aanotify_bin + ['--help']) + return_code, output = cmd_pipe_stderr(aanotify_bin + ['--help']) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) @@ -275,7 +247,7 @@ Filtering options: expected_return_code = 0 expected_output_has = 'AppArmor denials: 20 (since' - return_code, output = cmd(aanotify_bin + ['-f', self.test_logfile_current, '-s', '100']) + return_code, output = cmd_pipe_stderr(aanotify_bin + ['-f', self.test_logfile_current, '-s', '100']) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) @@ -288,7 +260,7 @@ Filtering options: expected_return_code = 0 expected_output_has = 'AppArmor denials: 10 (since' - return_code, output = cmd(aanotify_bin + ['-f', self.test_logfile_last_login, '-l']) + return_code, output = cmd_pipe_stderr(aanotify_bin + ['-f', self.test_logfile_last_login, '-l']) if "ERROR: Could not find last login" in output: self.skipTest('Could not find last login') result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) @@ -364,7 +336,7 @@ Logfile: {logfile} AppArmor denials: 10 (since'''.format(logfile=self.test_logfile_last_login) # noqa: E128 - return_code, output = cmd(aanotify_bin + ['-f', self.test_logfile_last_login, '-l', '-v']) + return_code, output = cmd_pipe_stderr(aanotify_bin + ['-f', self.test_logfile_last_login, '-l', '-v']) if "ERROR: Could not find last login" in output: self.skipTest('Could not find last login') result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) @@ -399,7 +371,7 @@ class AANotifyProfileFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + days_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + days_params + params) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) @@ -430,7 +402,7 @@ class AANotifyProfileFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + login_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + login_params + params) if 'ERROR: Could not find last login' in output: self.skipTest('Could not find last login') result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) @@ -463,7 +435,7 @@ class AANotifyOperationFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + days_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + days_params + params) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) @@ -492,7 +464,7 @@ class AANotifyOperationFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + login_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + login_params + params) if 'ERROR: Could not find last login' in output: self.skipTest('Could not find last login') result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) @@ -528,7 +500,7 @@ class AANotifyNameFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + days_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + days_params + params) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) @@ -560,7 +532,7 @@ class AANotifyNameFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + login_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + login_params + params) if 'ERROR: Could not find last login' in output: self.skipTest('Could not find last login') result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) @@ -593,7 +565,7 @@ class AANotifyDeniedFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + days_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + days_params + params) result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) self.assertEqual(expected_return_code, return_code, result + output) result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) @@ -622,7 +594,7 @@ class AANotifyDeniedFilterTest(AANotifyBase): expected_return_code = expected[0] expected_output_has = expected[1] - return_code, output = cmd(aanotify_bin + login_params + params) + return_code, output = cmd_pipe_stderr(aanotify_bin + login_params + params) if 'ERROR: Could not find last login' in output: self.skipTest('Could not find last login') result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) diff --git a/utils/test/test-aa-show-usage.py b/utils/test/test-aa-show-usage.py new file mode 100644 index 000000000..10251e2fd --- /dev/null +++ b/utils/test/test-aa-show-usage.py @@ -0,0 +1,118 @@ +#! /usr/bin/python3 +# ------------------------------------------------------------------ +# +# Copyright (C) 2025 Maxime Bélair +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of version 2 of the GNU General Public +# License published by the Free Software Foundation. +# +# ------------------------------------------------------------------ + +import os +import sys +import unittest +import subprocess + +import apparmor.aa as aa +from apparmor.common import cmd +from common_test import AATest, setup_aa, setup_all_loops + + +class AAShowUsageTest(AATest): + + def test_help_contents(self): + """Test output of help text""" + + expected_return_code = 0 + + expected_output_1 = \ +'''usage: aa-show-usage [-h] [-s {all,used,unused}] [-j] [-d DIR] + [--show-matching-path] [--filter.flags FLAGS] + [--filter.profile_name PROFILE_NAME] + [--filter.profile_attach PROFILE_ATTACH] + [--filter.profile_path PROFILE_PATH] + +Check which profiles are used +''' # noqa: E128 + + expected_output_2 = \ +''' + -h, --help show this help message and exit + -s, --show-type {all,used,unused} + Type of profiles to show + -j, --json Output in JSON + -d, --dir DIR Path to profiles + --show-matching-path Show the path of a file matching the profile + +Filtering options: + Filters are used to reduce the output of information to only those entries + that will match the filter. Filters use Python's regular expression syntax. + + --filter.flags FLAGS Filter by flags + --filter.profile_name PROFILE_NAME + Filter by profile name + --filter.profile_attach PROFILE_ATTACH + Filter by profile attachment + --filter.profile_path PROFILE_PATH + Filter by profile path +''' # noqa: E128 + + if sys.version_info[:2] < (3, 13): + # Python 3.13 tweaked argparse output [1]. When running on older + # Python versions, we adapt the expected output to match. + # + # https://github.com/python/cpython/pull/103372 + patches = [( + '-s, --show-type {all,used,unused}', + '-s {all,used,unused}, --show-type {all,used,unused}', + ), ( + '-d, --dir DIR Path to profiles', + '-d DIR, --dir DIR Path to profiles' + )] + for patch in patches: + expected_output_2 = expected_output_2.replace(patch[0], patch[1]) + + return_code, output = cmd([aashowusage_bin, '--help']) + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + + self.assertIn(expected_output_1, output) + self.assertIn(expected_output_2, output) + + def test_show_unconfined_profiles(self): + expected_return_code = 0 + return_code, output = cmd([aashowusage_bin, '--filter.flags=unconfined', '-d', aa.profile_dir]) + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + + nb_profile = 0 + + for line in output.splitlines(): + if line.startswith(' Profile '): + nb_profile += 1 + + command = ['grep', '-Er', r'flags=.*unconfined.*\{', '--', aa.profile_dir] + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, check=False) + self.assertEqual( + len(result.stdout.splitlines()), nb_profile, + "Error found {} profiles, expected {}\n\n Output was: \n {}. Grepped profiles are: {}".format( + nb_profile, len(result.stdout.splitlines()), output, result.stdout) + ) + + +setup_aa(aa) # Wrapper for aa.init_aa() +setup_all_loops(__name__) + +# The location of the aa-show-usage utility can be overridden by setting +# the APPARMOR_SHOW_USAGE or USE_SYSTEM environment variable; +# this is useful for running these tests in an installed environment +aashowusage_bin = "../aa-show-usage" + +if __name__ == '__main__': + if 'APPARMOR_SHOW_USAGE' in os.environ: + aashowusage_bin = os.environ['APPARMOR_SHOW_USAGE'] + elif 'USE_SYSTEM' in os.environ: + aashowusage_bin = 'aa-show-usage' + + unittest.main(verbosity=1) diff --git a/utils/test/test-aa.py b/utils/test/test-aa.py index 6db9414b8..c5cde894c 100644 --- a/utils/test/test-aa.py +++ b/utils/test/test-aa.py @@ -552,7 +552,7 @@ class AaTest_parse_profile_data(AATest): self.assertEqual(prof['/foo']['name'], '/foo') self.assertEqual(prof['/foo']['filename'], 'somefile') self.assertEqual(prof['/foo']['flags'], None) - self.assertEqual(prof['/foo']['xattrs'], 'user.bar=bar') + self.assertEqual(prof['/foo']['xattrs'], {'user.bar': 'bar'}) def test_parse_xattrs_02(self): prof = parse_profile_data('/foo xattrs=(user.bar=bar user.foo=*) {\n}\n'.split(), 'somefile', False, False) @@ -561,7 +561,7 @@ class AaTest_parse_profile_data(AATest): self.assertEqual(prof['/foo']['name'], '/foo') self.assertEqual(prof['/foo']['filename'], 'somefile') self.assertEqual(prof['/foo']['flags'], None) - self.assertEqual(prof['/foo']['xattrs'], 'user.bar=bar user.foo=*') + self.assertEqual(prof['/foo']['xattrs'], {'user.bar': 'bar', 'user.foo': '*'}) def test_parse_xattrs_03(self): d = '/foo xattrs=(user.bar=bar) flags=(complain) {\n}\n' @@ -571,7 +571,7 @@ class AaTest_parse_profile_data(AATest): self.assertEqual(prof['/foo']['name'], '/foo') self.assertEqual(prof['/foo']['filename'], 'somefile') self.assertEqual(prof['/foo']['flags'], 'complain') - self.assertEqual(prof['/foo']['xattrs'], 'user.bar=bar') + self.assertEqual(prof['/foo']['xattrs'], {'user.bar': 'bar'}) def test_parse_xattrs_04(self): with self.assertRaises(AppArmorException): diff --git a/utils/test/test-profile-storage.py b/utils/test/test-profile-storage.py index 21d4e0e34..e4a4ea349 100644 --- a/utils/test/test-profile-storage.py +++ b/utils/test/test-profile-storage.py @@ -41,30 +41,30 @@ class TestUnknownKey(AATest): class AaTest_get_header(AATest): tests = ( # name embedded_hat depth flags attachment xattrs prof.keyw. comment expected - (('/foo', False, 1, 'complain', '', '', False, ''), ' /foo flags=(complain) {'), - (('/foo', True, 1, 'complain', '', '', False, ''), ' profile /foo flags=(complain) {'), - (('/foo sp', False, 2, 'complain', '', '', False, ''), ' "/foo sp" flags=(complain) {'), - (('/foo', True, 2, 'complain', '', '', False, ''), ' profile /foo flags=(complain) {'), - (('/foo', False, 0, None, '', '', False, ''), '/foo {'), - (('/foo', False, 0, None, '', 'user.foo=bar', False, ''), '/foo xattrs=(user.foo=bar) {'), - (('/foo', True, 0, None, '', '', False, ''), 'profile /foo {'), - (('bar', False, 1, 'complain', '', '', False, ''), ' profile bar flags=(complain) {'), - (('bar', False, 1, 'complain', '/foo', '', False, ''), ' profile bar /foo flags=(complain) {'), - (('bar', True, 1, 'complain', '/foo', '', False, ''), ' profile bar /foo flags=(complain) {'), - (('bar baz', False, 1, None, '/foo', '', False, ''), ' profile "bar baz" /foo {'), - (('bar', True, 1, None, '/foo', '', False, ''), ' profile bar /foo {'), - (('bar baz', False, 1, 'complain', '/foo sp', '', False, ''), ' profile "bar baz" "/foo sp" flags=(complain) {'), - (('bar baz', False, 1, 'complain', '/foo sp', 'user.foo=bar', False, ''), ' profile "bar baz" "/foo sp" xattrs=(user.foo=bar) flags=(complain) {'), - (('^foo', False, 1, 'complain', '', '', False, ''), ' profile ^foo flags=(complain) {'), - (('^foo', True, 1, 'complain', '', '', False, ''), ' ^foo flags=(complain) {'), - (('^foo', True, 1.5, 'complain', '', '', False, ''), ' ^foo flags=(complain) {'), - (('^foo', True, 1.3, 'complain', '', '', False, ''), ' ^foo flags=(complain) {'), - (('/foo', False, 1, 'complain', '', '', True, ''), ' profile /foo flags=(complain) {'), - (('/foo', True, 1, 'complain', '', '', True, ''), ' profile /foo flags=(complain) {'), - (('/foo', False, 1, 'complain', '', '', False, '# x'), ' /foo flags=(complain) { # x'), - (('/foo', True, 1, None, '', '', False, '# x'), ' profile /foo { # x'), - (('/foo', False, 1, None, '', '', True, '# x'), ' profile /foo { # x'), - (('/foo', True, 1, 'complain', '', '', True, '# x'), ' profile /foo flags=(complain) { # x'), + (('/foo', False, 1, 'complain', '', {}, False, ''), ' /foo flags=(complain) {'), + (('/foo', True, 1, 'complain', '', {}, False, ''), ' profile /foo flags=(complain) {'), + (('/foo sp', False, 2, 'complain', '', {}, False, ''), ' "/foo sp" flags=(complain) {'), + (('/foo', True, 2, 'complain', '', {}, False, ''), ' profile /foo flags=(complain) {'), + (('/foo', False, 0, None, '', {}, False, ''), '/foo {'), + (('/foo', False, 0, None, '', {'user.foo': 'bar'}, False, ''), '/foo xattrs=(user.foo=bar) {'), + (('/foo', True, 0, None, '', {}, False, ''), 'profile /foo {'), + (('bar', False, 1, 'complain', '', {}, False, ''), ' profile bar flags=(complain) {'), + (('bar', False, 1, 'complain', '/foo', {}, False, ''), ' profile bar /foo flags=(complain) {'), + (('bar', True, 1, 'complain', '/foo', {}, False, ''), ' profile bar /foo flags=(complain) {'), + (('bar baz', False, 1, None, '/foo', {}, False, ''), ' profile "bar baz" /foo {'), + (('bar', True, 1, None, '/foo', {}, False, ''), ' profile bar /foo {'), + (('bar baz', False, 1, 'complain', '/foo sp', {}, False, ''), ' profile "bar baz" "/foo sp" flags=(complain) {'), + (('bar baz', False, 1, 'complain', '/foo sp', {'user.foo': 'bar'}, False, ''), ' profile "bar baz" "/foo sp" xattrs=(user.foo=bar) flags=(complain) {'), + (('^foo', False, 1, 'complain', '', {}, False, ''), ' profile ^foo flags=(complain) {'), + (('^foo', True, 1, 'complain', '', {}, False, ''), ' ^foo flags=(complain) {'), + (('^foo', True, 1.5, 'complain', '', {}, False, ''), ' ^foo flags=(complain) {'), + (('^foo', True, 1.3, 'complain', '', {}, False, ''), ' ^foo flags=(complain) {'), + (('/foo', False, 1, 'complain', '', {}, True, ''), ' profile /foo flags=(complain) {'), + (('/foo', True, 1, 'complain', '', {}, True, ''), ' profile /foo flags=(complain) {'), + (('/foo', False, 1, 'complain', '', {}, False, '# x'), ' /foo flags=(complain) { # x'), + (('/foo', True, 1, None, '', {}, False, '# x'), ' profile /foo { # x'), + (('/foo', False, 1, None, '', {}, True, '# x'), ' profile /foo { # x'), + (('/foo', True, 1, 'complain', '', {}, True, '# x'), ' profile /foo flags=(complain) { # x'), ) def _run_test(self, params, expected): @@ -88,8 +88,10 @@ class AaTest_get_header_01(AATest): ({'name': '/foo', 'depth': 1, 'flags': 'complain'}, ' /foo flags=(complain) {'), ({'name': '/foo', 'depth': 1, 'flags': 'complain', 'profile_keyword': True}, ' profile /foo flags=(complain) {'), ({'name': '/foo', 'flags': 'complain'}, '/foo flags=(complain) {'), - ({'name': '/foo', 'xattrs': 'user.foo=bar', 'flags': 'complain'}, '/foo xattrs=(user.foo=bar) flags=(complain) {'), - ({'name': '/foo', 'xattrs': 'user.foo=bar', 'embedded_hat': True}, 'profile /foo xattrs=(user.foo=bar) {'), + ({'name': '/foo', 'xattrs': {'user.foo': 'bar'}, 'flags': 'complain'}, '/foo xattrs=(user.foo=bar) flags=(complain) {'), + ({'name': '/foo', 'xattrs': {'user.foo': 'bar'}, 'embedded_hat': True}, 'profile /foo xattrs=(user.foo=bar) {'), + ({'name': '/foo', 'xattrs': {'user.foo': None}, 'embedded_hat': True}, 'profile /foo xattrs=(user.foo) {'), + ({'name': '/foo', 'xattrs': {'user.foo': None, 'user.bar': None}, 'embedded_hat': True}, 'profile /foo xattrs=(user.bar user.foo) {'), ) def _run_test(self, params, expected): @@ -178,6 +180,7 @@ class TestSetInvalid(AATest): (('attachment', None), AppArmorBug), (('filename', True), AppArmorBug), # expects string or None (('allow', None), AppArmorBug), # doesn't allow overwriting at all + (('xattrs', 0), AppArmorBug), # Invalid type ) def _run_test(self, params, expected): @@ -196,7 +199,7 @@ class AaTest_repr(AATest): def testRepr(self): prof_storage = ProfileStorage('foo', 'hat', 'TEST') prof_storage['name'] = 'foo' - prof_storage['xattrs'] = 'user.bar=bar' + prof_storage['xattrs'] = {'user.bar': 'bar'} prof_storage['capability'].add(CapabilityRule('dac_override')) self.assertEqual(str(prof_storage), '\n\nprofile foo xattrs=(user.bar=bar) {\n capability dac_override,\n\n}\n\n') @@ -205,15 +208,21 @@ class AaTest_repr(AATest): class AaTest_parse_profile_start(AATest): tests = ( # profile start line profile hat parent name profile hat attachment xattrs flags pps_set_hat_external - (('/foo {', None, None), ('', '/foo', '/foo', '/foo', '', '', None, False)), - (('/foo (complain) {', None, None), ('', '/foo', '/foo', '/foo', '', '', 'complain', False)), - (('profile foo /foo {', None, None), ('', 'foo', 'foo', 'foo', '/foo', '', None, False)), # named profile - (('profile /foo {', '/bar', None), ('/bar', '/foo', '/bar', '/foo', '', '', None, False)), # child profile - (('/foo//bar {', None, None), ('/foo', '/foo//bar', '/foo', 'bar', '', '', None, True)), # external hat - (('profile "/foo" (complain) {', None, None), ('', '/foo', '/foo', '/foo', '', '', 'complain', False)), - (('profile "/foo" xattrs=(user.bar=bar) {', None, None), ('', '/foo', '/foo', '/foo', '', 'user.bar=bar', None, False)), - (('profile "/foo" xattrs=(user.bar=bar user.foo=*) {', None, None), ('', '/foo', '/foo', '/foo', '', 'user.bar=bar user.foo=*', None, False)), - (('/usr/bin/xattrs-test xattrs=(myvalue="foo.bar") {', None, None), ('', '/usr/bin/xattrs-test', '/usr/bin/xattrs-test', '/usr/bin/xattrs-test', '', 'myvalue="foo.bar"', None, False)), + (('/foo {', None, None), ('', '/foo', '/foo', '/foo', '', {}, None, False)), + (('/foo (complain) {', None, None), ('', '/foo', '/foo', '/foo', '', {}, 'complain', False)), + (('profile foo /foo {', None, None), ('', 'foo', 'foo', 'foo', '/foo', {}, None, False)), # named profile + (('profile /foo {', '/bar', None), ('/bar', '/foo', '/bar', '/foo', '', {}, None, False)), # child profile + (('profile /foo xattrs=() {', '/bar', None), ('/bar', '/foo', '/bar', '/foo', '', {}, None, False)), + (('/foo//bar {', None, None), ('/foo', '/foo//bar', '/foo', 'bar', '', {}, None, True)), # external hat + (('profile "/foo" (complain) {', None, None), ('', '/foo', '/foo', '/foo', '', {}, 'complain', False)), + (('profile "/foo" xattrs=() {', None, None), ('', '/foo', '/foo', '/foo', '', {}, None, False)), + (('profile "/foo" xattrs=(user.bar) {', None, None), ('', '/foo', '/foo', '/foo', '', {'user.bar': None}, None, False)), + (('profile "/foo" xattrs=(user.foo user.bar) {', None, None), ('', '/foo', '/foo', '/foo', '', {'user.bar': None, 'user.foo': None}, + None, False)), # noqa: E127 + (('profile "/foo" xattrs=(user.bar=bar) {', None, None), ('', '/foo', '/foo', '/foo', '', {'user.bar': 'bar'}, None, False)), + (('profile "/foo" xattrs=(user.bar=bar user.foo=*) {', None, None), ('', '/foo', '/foo', '/foo', '', {'user.bar': 'bar', 'user.foo': '*'}, + None, False)), # noqa: E127 + (('/usr/bin/xattrs-test xattrs=(myvalue="foo.bar") {', None, None), ('', '/usr/bin/xattrs-test', '/usr/bin/xattrs-test', '/usr/bin/xattrs-test', '', {'myvalue': '"foo.bar"'}, None, False)), ) def _run_test(self, params, expected): diff --git a/utils/test/test-regex_matches.py b/utils/test/test-regex_matches.py index 243707762..d63046b60 100644 --- a/utils/test/test-regex_matches.py +++ b/utils/test/test-regex_matches.py @@ -17,7 +17,8 @@ from apparmor.regex import ( RE_PROFILE_CAP, RE_PROFILE_DBUS, RE_PROFILE_MOUNT, RE_PROFILE_PTRACE, RE_PROFILE_SIGNAL, RE_PROFILE_START, parse_profile_start_line, re_match_include, RE_PROFILE_UNIX, RE_PROFILE_PIVOT_ROOT, - re_match_include_parse, strip_parenthesis, strip_quotes) + re_match_include_parse, strip_parenthesis, strip_quotes, resolve_variables, expand_braces, + expand_var, expand_string, re_print_dict, re_parse_dict) from common_test import AATest, setup_aa, setup_all_loops @@ -724,6 +725,159 @@ class TestStripQuotes(AATest): self.assertEqual(strip_quotes(params), expected) +class TestExpandBraces(AATest): + tests = ( + ('foo', ['foo']), + ('/{,foo}', ['/', '/foo']), + ('/{,foo,bar}', ['/', '/foo', '/bar']), + ('/{bin,sbin}/runc', ['/bin/runc', '/sbin/runc']), + ('/{,usr/}{,s}bin/runc', ['/bin/runc', '/sbin/runc', '/usr/bin/runc', '/usr/sbin/runc']), + ('/{,usr/{,s}}bin/runc', ['/bin/runc', '/usr/bin/runc', '/usr/sbin/runc']), + ('{{a,b},{c,{d,e}},f}', ['a', 'b', 'c', 'd', 'e', 'f']), + ('{,b}{d,e}', ['d', 'e', 'bd', 'be']), + ('{aa,b}{d,e}', ['aad', 'aae', 'bd', 'be']), + ('{{foo,bar},{baz,qux}}', ['foo', 'bar', 'baz', 'qux']), + ) + + def _run_test(self, params, expected): + self.assertEqual(expand_braces(params), expected) + + +class TestInvalidExpandBraces(AATest): + tests = ( + # Malformed expressions + ('/{', AppArmorException), + ('/{}}{', AppArmorException), + ('/}', AppArmorException), + ('/{foo,bar}}', AppArmorException), + ('/{a,b},{c,d}}', AppArmorException), + # Braces should always provide at least 2 alternatives + ('{foo}', AppArmorException), + ('/{foo}/', AppArmorException), + ('/{,foo}{bar}', AppArmorException), + + ) + + def _run_test(self, params, expected): + with self.assertRaises(expected): + expand_braces(params) + + +var_dict = { + # Normal variables + '@{a}': ['AAA'], + '@{b}': ['BBB'], + # Multiple values variables + '@{c}': ['CC', 'CCC'], + '@{d}': ['DD', 'DDD', 'DDDD'], + # Variable relying on other variables + '@{e}': ['@{a}'], + '@{f}': ['@{e}@{b}', '@{c}'], + '@{g}': ['/bin/@{f}/{foo,bar}'], + # Invalid variables + '@{h}': ['@{h}'], # Self reference + '@{i}': ['@{j}'], # Circular reference with i and j + '@{j}': ['@{i}'], +} + + +class TestResolveVariables(AATest): + tests = ( + ('@{a}', ['AAA']), + ('/@{a}/@{b}/', ['/AAA/BBB/']), + ('/@{a}/@{c}/', ['/AAA/CC/', '/AAA/CCC/']), + ('/@{a}/@{d}/', ['/AAA/DD/', '/AAA/DDD/', '/AAA/DDDD/']), + ('/@{c}/@{d}/', ['/CC/DD/', '/CC/DDD/', '/CC/DDDD/', '/CCC/DD/', '/CCC/DDD/', '/CCC/DDDD/']), + ('/@{e}/', ['/AAA/']), + ('/@{f}/', ['/AAABBB/', '/CC/', '/CCC/']), + ('@{g}', ['/bin/AAABBB/{foo,bar}', '/bin/CC/{foo,bar}', '/bin/CCC/{foo,bar}']), + ) + + def _run_test(self, params, expected): + self.assertEqual(resolve_variables(params, var_dict), expected) + + +class TestInvalidResolveVariables(AATest): + tests = ( + ('@{h}', AppArmorException), + ('@{i}', AppArmorException), + ('@{z}', AppArmorException), # @{z} doesn't exist + ) + + def _run_test(self, params, expected): + with self.assertRaises(expected): + resolve_variables(params, var_dict) + + +class TestExpandVar(AATest): + tests = ( + (('@{a}', set()), ['AAA']), + (('@{d}', set()), ['DD', 'DDD', 'DDDD']), + (('@{f}', set()), ['AAABBB', 'CC', 'CCC']), + ) + + def _run_test(self, params, expected): + var, seen_vars = params + self.assertEqual(expand_var(var, var_dict, seen_vars), expected) + + +class TestInvalidExpandVar(AATest): + tests = ( + (('@{h}', set()), AppArmorException), # Circular dependency + (('@{a}', {'@{a}'}), AppArmorException), # Circular dependency + (('@{z}', set()), AppArmorException), # Invalid variable (not in var_dict) + ) + + def _run_test(self, params, expected): + with self.assertRaises(expected): + var, seen_vars = params + expand_var(var, var_dict, seen_vars) + + +class TestExpandString(AATest): + tests = ( + ('@{g}/@{a}', ['/bin/AAABBB/{foo,bar}/AAA', '/bin/CC/{foo,bar}/AAA', '/bin/CCC/{foo,bar}/AAA']), + ('@{g}/@{c}', ['/bin/AAABBB/{foo,bar}/CC', '/bin/AAABBB/{foo,bar}/CCC', '/bin/CC/{foo,bar}/CC', '/bin/CC/{foo,bar}/CCC', '/bin/CCC/{foo,bar}/CC', '/bin/CCC/{foo,bar}/CCC']), + ) + + def _run_test(self, params, expected): + self.assertEqual(expand_string(params, var_dict, set()), expected) + + +class TestInvalidExpandString(AATest): + tests = ( + (('@{h}', set()), AppArmorException), # Circular dependency + (('@{a}', {'@{a}'}), AppArmorException), # Circular dependency + (('@{z}', set()), AppArmorException), # Invalid variable (not in var_dict) + ) + + def _run_test(self, params, expected): + with self.assertRaises(expected): + var, seen_vars = params + expand_string(var, var_dict, seen_vars) + + +class TestRePrintDict(AATest): + tests = ( + ({'a': 'b'}, 'a=b'), + ({'a': 'b', 'bb': 'cc'}, 'a=b bb=cc'), + ({'z': 'c', 'y': 'b', 'x': 'a'}, 'x=a y=b z=c'), + ) + + def _run_test(self, params, expected): + self.assertEqual(re_print_dict(params), expected) + + +class TestReParseDict(AATest): + tests = ( + ('a=b', {'a': 'b'}), + (' a=bbb bb=cc', {'a': 'bbb', 'bb': 'cc'}), + ) + + def _run_test(self, params, expected): + self.assertEqual(re_parse_dict(params), expected) + + setup_aa(aa) setup_all_loops(__name__) if __name__ == '__main__':