mirror of
https://gitlab.com/apparmor/apparmor
synced 2025-09-04 08:15:21 +00:00
Compare commits
16 Commits
check-if-s
...
fix-dirtes
Author | SHA1 | Date | |
---|---|---|---|
|
c0815d0e0f | ||
|
c0b5d90848 | ||
|
e6cbdef4ab | ||
|
ea0dc96050 | ||
|
d442584a0a | ||
|
cf6606d380 | ||
|
85734c3bac | ||
|
47d68dac0f | ||
|
0c1eb3ec92 | ||
|
97bd86c7c6 | ||
|
0859b42ba2 | ||
|
e1011d646d | ||
|
db4939cf6a | ||
|
44c814e76c | ||
|
df97cf89bd | ||
|
8175d26cc3 |
@@ -77,7 +77,7 @@ test-utils:
|
||||
extends:
|
||||
- .ubuntu-before_script
|
||||
script:
|
||||
- apt-get install --no-install-recommends -y libc6-dev libjs-jquery libjs-jquery-throttle-debounce libjs-jquery-isonscreen libjs-jquery-tablesorter pyflakes3 python3-coverage python3-notify2 python3-psutil
|
||||
- apt-get install --no-install-recommends -y libc6-dev libjs-jquery libjs-jquery-throttle-debounce libjs-jquery-isonscreen libjs-jquery-tablesorter pyflakes3 python3-coverage python3-notify2 python3-psutil python3-setuptools
|
||||
# See apparmor/apparmor#221
|
||||
- make -C parser/tst gen_dbus
|
||||
- make -C parser/tst gen_xtrans
|
||||
|
@@ -1,6 +1 @@
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
from LibAppArmor.LibAppArmor import *
|
||||
else:
|
||||
from .LibAppArmor import *
|
||||
from LibAppArmor.LibAppArmor import *
|
||||
|
@@ -10,8 +10,7 @@ test_python.py: test_python.py.in $(top_builddir)/config.status
|
||||
|
||||
CLEANFILES = test_python.py
|
||||
|
||||
# bah, how brittle is this?
|
||||
PYTHON_DIST_BUILD_PATH = '$(builddir)/../build/$$($(PYTHON) -c "import sysconfig; print(\"lib.%s-%s\" %(sysconfig.get_platform(), sysconfig.get_python_version()))")'
|
||||
PYTHON_DIST_BUILD_PATH = '$(builddir)/../build/$$($(PYTHON) buildpath.py)'
|
||||
|
||||
TESTS = test_python.py
|
||||
TESTS_ENVIRONMENT = \
|
||||
|
10
libraries/libapparmor/swig/python/test/buildpath.py
Normal file
10
libraries/libapparmor/swig/python/test/buildpath.py
Normal file
@@ -0,0 +1,10 @@
|
||||
#!/usr/bin/python3
|
||||
# the build path has changed in setuptools 61.2
|
||||
import sys
|
||||
import sysconfig
|
||||
import setuptools
|
||||
if tuple(map(int,setuptools.__version__.split("."))) >= (61, 2):
|
||||
identifier = sys.implementation.cache_tag
|
||||
else:
|
||||
identifier = "%d.%d" % sys.version_info[:2]
|
||||
print("lib.%s-%s" % (sysconfig.get_platform(), identifier))
|
@@ -17,7 +17,6 @@
|
||||
|
||||
from argparse import ArgumentParser
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import time
|
||||
import tempfile
|
||||
@@ -147,9 +146,6 @@ class AAParserCachingCommon(testlib.AATestTemplate):
|
||||
|
||||
class AAParserBasicCachingTests(AAParserCachingCommon):
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserBasicCachingTests, self).setUp()
|
||||
|
||||
def test_no_cache_by_default(self):
|
||||
'''test profiles are not cached by default'''
|
||||
|
||||
@@ -201,7 +197,7 @@ class AAParserAltCacheBasicTests(AAParserBasicCachingTests):
|
||||
'''Same tests as above, but with an alternate cache location specified on the command line'''
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserAltCacheBasicTests, self).setUp()
|
||||
super().setUp()
|
||||
|
||||
alt_cache_loc = tempfile.mkdtemp(prefix='aa-alt-cache', dir=self.tmp_dir)
|
||||
os.chmod(alt_cache_loc, 0o755)
|
||||
@@ -213,14 +209,14 @@ class AAParserAltCacheBasicTests(AAParserBasicCachingTests):
|
||||
def tearDown(self):
|
||||
if len(os.listdir(self.unused_cache_loc)) > 0:
|
||||
self.fail('original cache dir \'%s\' not empty' % self.unused_cache_loc)
|
||||
super(AAParserAltCacheBasicTests, self).tearDown()
|
||||
super().tearDown()
|
||||
|
||||
|
||||
class AAParserCreateCacheBasicTestsCacheExists(AAParserBasicCachingTests):
|
||||
'''Same tests as above, but with create cache option on the command line and the cache already exists'''
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserCreateCacheBasicTestsCacheExists, self).setUp()
|
||||
super().setUp()
|
||||
self.cmd_prefix.append('--create-cache-dir')
|
||||
|
||||
|
||||
@@ -228,7 +224,7 @@ class AAParserCreateCacheBasicTestsCacheNotExist(AAParserBasicCachingTests):
|
||||
'''Same tests as above, but with create cache option on the command line and cache dir removed'''
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserCreateCacheBasicTestsCacheNotExist, self).setUp()
|
||||
super().setUp()
|
||||
shutil.rmtree(self.cache_dir)
|
||||
self.cmd_prefix.append('--create-cache-dir')
|
||||
|
||||
@@ -238,7 +234,7 @@ class AAParserCreateCacheAltCacheTestsCacheNotExist(AAParserBasicCachingTests):
|
||||
alt cache specified, and cache dir removed'''
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserCreateCacheAltCacheTestsCacheNotExist, self).setUp()
|
||||
super().setUp()
|
||||
shutil.rmtree(self.cache_dir)
|
||||
self.cmd_prefix.append('--create-cache-dir')
|
||||
|
||||
@@ -246,7 +242,7 @@ class AAParserCreateCacheAltCacheTestsCacheNotExist(AAParserBasicCachingTests):
|
||||
class AAParserCachingTests(AAParserCachingCommon):
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserCachingTests, self).setUp()
|
||||
super().setUp()
|
||||
|
||||
r = testlib.filesystem_time_resolution()
|
||||
self.mtime_res = r[1]
|
||||
@@ -258,24 +254,10 @@ class AAParserCachingTests(AAParserCachingCommon):
|
||||
self.run_cmd_check(cmd)
|
||||
self.assert_path_exists(self.cache_file)
|
||||
|
||||
def _assertTimeStampEquals(self, time1, time2):
|
||||
'''Compare two timestamps to ensure equality'''
|
||||
|
||||
# python 3.2 and earlier don't support writing timestamps with
|
||||
# nanosecond resolution, only microsecond. When comparing
|
||||
# timestamps in such an environment, loosen the equality bounds
|
||||
# to compensate
|
||||
# Reference: https://bugs.python.org/issue12904
|
||||
(major, minor, _) = platform.python_version_tuple()
|
||||
if (int(major) < 3) or ((int(major) == 3) and (int(minor) <= 2)):
|
||||
self.assertAlmostEquals(time1, time2, places=5)
|
||||
else:
|
||||
self.assertEqual(time1, time2)
|
||||
|
||||
def _set_mtime(self, path, mtime):
|
||||
atime = os.stat(path).st_atime
|
||||
os.utime(path, (atime, mtime))
|
||||
self._assertTimeStampEquals(os.stat(path).st_mtime, mtime)
|
||||
self.assertEqual(os.stat(path).st_mtime, mtime)
|
||||
|
||||
def test_cache_loaded_when_exists(self):
|
||||
'''test cache is loaded when it exists, is newer than profile, and features match'''
|
||||
@@ -458,7 +440,7 @@ class AAParserCachingTests(AAParserCachingCommon):
|
||||
|
||||
stat = os.stat(self.cache_file)
|
||||
self.assertNotEqual(orig_stat.st_ino, stat.st_ino)
|
||||
self._assertTimeStampEquals(profile_mtime, stat.st_mtime)
|
||||
self.assertEqual(profile_mtime, stat.st_mtime)
|
||||
|
||||
def test_abstraction_newer_rewrites_cache(self):
|
||||
'''test cache is rewritten if abstraction is newer'''
|
||||
@@ -475,7 +457,7 @@ class AAParserCachingTests(AAParserCachingCommon):
|
||||
|
||||
stat = os.stat(self.cache_file)
|
||||
self.assertNotEqual(orig_stat.st_ino, stat.st_ino)
|
||||
self._assertTimeStampEquals(abstraction_mtime, stat.st_mtime)
|
||||
self.assertEqual(abstraction_mtime, stat.st_mtime)
|
||||
|
||||
def test_parser_newer_uses_cache(self):
|
||||
'''test cache is not skipped if parser is newer'''
|
||||
@@ -521,7 +503,7 @@ class AAParserAltCacheTests(AAParserCachingTests):
|
||||
check_orig_cache = True
|
||||
|
||||
def setUp(self):
|
||||
super(AAParserAltCacheTests, self).setUp()
|
||||
super().setUp()
|
||||
|
||||
alt_cache_loc = tempfile.mkdtemp(prefix='aa-alt-cache', dir=self.tmp_dir)
|
||||
os.chmod(alt_cache_loc, 0o755)
|
||||
@@ -534,7 +516,7 @@ class AAParserAltCacheTests(AAParserCachingTests):
|
||||
def tearDown(self):
|
||||
if self.check_orig_cache and len(os.listdir(self.orig_cache_dir)) > 0:
|
||||
self.fail('original cache dir \'%s\' not empty' % self.orig_cache_dir)
|
||||
super(AAParserAltCacheTests, self).tearDown()
|
||||
super().tearDown()
|
||||
|
||||
def test_cache_purge_leaves_original_cache_alone(self):
|
||||
'''test cache purging only touches alt cache'''
|
||||
|
@@ -31,8 +31,9 @@ do_tst() {
|
||||
shift 2
|
||||
#global tmpdir
|
||||
|
||||
${APPARMOR_PARSER} "$@" > "$tmpdir/out" 2>/dev/null
|
||||
${APPARMOR_PARSER} "$@" > "$tmpdir/out.unsorted" 2>/dev/null
|
||||
rc=$?
|
||||
LC_ALL=C sort "$tmpdir/out.unsorted" > "$tmpdir/out"
|
||||
if [ $rc -ne 0 ] && [ "$expected" != "fail" ] ; then
|
||||
echo "failed: expected \"$expected\" but parser returned error"
|
||||
return 1
|
||||
|
@@ -1,3 +1,3 @@
|
||||
good_target
|
||||
a_profile
|
||||
b_profile
|
||||
good_target
|
||||
|
@@ -13,12 +13,12 @@
|
||||
# TODO
|
||||
# - finish adding suppressions for valgrind false positives
|
||||
|
||||
from argparse import ArgumentParser # requires python 2.7 or newer
|
||||
from argparse import ArgumentParser
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
import testlib
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
DEFAULT_TESTDIR = "./simple_tests/vars"
|
||||
VALGRIND_ERROR_CODE = 151
|
||||
@@ -65,12 +65,9 @@ def find_testcases(testdir):
|
||||
|
||||
def create_suppressions():
|
||||
'''generate valgrind suppressions file'''
|
||||
|
||||
handle, name = tempfile.mkstemp(suffix='.suppressions', prefix='aa-parser-valgrind')
|
||||
os.close(handle)
|
||||
with open(name, "w+") as handle:
|
||||
handle.write(VALGRIND_SUPPRESSIONS)
|
||||
return name
|
||||
with NamedTemporaryFile("w+", suffix='.suppressions', prefix='aa-parser-valgrind', delete=False) as temp_file:
|
||||
temp_file.write(VALGRIND_SUPPRESSIONS)
|
||||
return temp_file.name
|
||||
|
||||
|
||||
def main():
|
||||
|
@@ -30,12 +30,16 @@ profile syslogd /{usr/,}{bin,sbin}/syslogd {
|
||||
|
||||
/dev/log wl,
|
||||
/var/lib/*/dev/log wl,
|
||||
/proc/kmsg r,
|
||||
|
||||
/dev/tty* w,
|
||||
/dev/xconsole rw,
|
||||
/etc/syslog.conf r,
|
||||
/etc/syslog.d/ r,
|
||||
/etc/syslog.d/* r,
|
||||
/{usr/,}{bin,sbin}/syslogd rmix,
|
||||
/var/log/** rw,
|
||||
@{run}/syslog.pid krwl,
|
||||
@{run}/syslogd.pid krwl,
|
||||
@{run}/utmp rw,
|
||||
/var/spool/compaq/nic/messages_fifo rw,
|
||||
|
@@ -13,7 +13,6 @@
|
||||
#
|
||||
# ----------------------------------------------------------------------
|
||||
# No old version logs, only 2.6 + supported
|
||||
from __future__ import division, with_statement
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
@@ -22,7 +21,8 @@ import sys
|
||||
import time
|
||||
import traceback
|
||||
import atexit
|
||||
import tempfile
|
||||
from shutil import which
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
import apparmor.config
|
||||
import apparmor.logparser
|
||||
@@ -33,7 +33,7 @@ from copy import deepcopy
|
||||
from apparmor.aare import AARE
|
||||
|
||||
from apparmor.common import (AppArmorException, AppArmorBug, cmd, is_skippable_file, open_file_read, valid_path, hasher,
|
||||
combine_profname, split_name, type_is_str, open_file_write, DebugLogger)
|
||||
combine_profname, split_name, open_file_write, DebugLogger)
|
||||
|
||||
import apparmor.ui as aaui
|
||||
|
||||
@@ -176,18 +176,6 @@ def check_for_apparmor(filesystem='/proc/filesystems', mounts='/proc/mounts'):
|
||||
break
|
||||
return aa_mountpoint
|
||||
|
||||
def which(file):
|
||||
"""Returns the executable fullpath for the file, None otherwise"""
|
||||
if sys.version_info >= (3, 3):
|
||||
return shutil.which(file)
|
||||
env_dirs = os.getenv('PATH').split(':')
|
||||
for env_dir in env_dirs:
|
||||
env_path = os.path.join(env_dir, file)
|
||||
# Test if the path is executable or not
|
||||
if os.access(env_path, os.X_OK):
|
||||
return env_path
|
||||
return None
|
||||
|
||||
def get_full_path(original_path):
|
||||
"""Return the full path after resolving any symlinks"""
|
||||
path = original_path
|
||||
@@ -626,11 +614,12 @@ def change_profile_flags(prof_filename, program, flag, set_flag):
|
||||
found = False
|
||||
depth = -1
|
||||
|
||||
if not flag or (type_is_str(flag) and flag.strip() == ''):
|
||||
if not flag or (type(flag) is str and flag.strip() == ''):
|
||||
raise AppArmorBug('New flag for %s is empty' % prof_filename)
|
||||
|
||||
with open_file_read(prof_filename) as f_in:
|
||||
temp_file = tempfile.NamedTemporaryFile('w', prefix=prof_filename, suffix='~', delete=False, dir=profile_dir)
|
||||
temp_file = NamedTemporaryFile('w', prefix=prof_filename, suffix='~', delete=False, dir=profile_dir)
|
||||
temp_file.close()
|
||||
shutil.copymode(prof_filename, temp_file.name)
|
||||
with open_file_write(temp_file.name) as f_out:
|
||||
for lineno, line in enumerate(f_in):
|
||||
@@ -1529,28 +1518,28 @@ def save_profiles(is_mergeprof=False):
|
||||
ans, arg = q.promptUser()
|
||||
|
||||
q.selected = arg # remember selection
|
||||
which = options[arg]
|
||||
profile_name = options[arg]
|
||||
|
||||
if ans == 'CMD_SAVE_SELECTED':
|
||||
write_profile_ui_feedback(which)
|
||||
reload_base(which)
|
||||
write_profile_ui_feedback(profile_name)
|
||||
reload_base(profile_name)
|
||||
q.selected = 0 # saving the selected profile removes it from the list, therefore reset selection
|
||||
|
||||
elif ans == 'CMD_VIEW_CHANGES':
|
||||
oldprofile = None
|
||||
if aa[which][which].get('filename', False):
|
||||
oldprofile = aa[which][which]['filename']
|
||||
if aa[profile_name][profile_name].get('filename', False):
|
||||
oldprofile = aa[profile_name][profile_name]['filename']
|
||||
else:
|
||||
oldprofile = get_profile_filename_from_attachment(which, True)
|
||||
oldprofile = get_profile_filename_from_attachment(profile_name, True)
|
||||
|
||||
serialize_options = {'METADATA': True}
|
||||
newprofile = serialize_profile(split_to_merged(aa), which, serialize_options)
|
||||
newprofile = serialize_profile(split_to_merged(aa), profile_name, serialize_options)
|
||||
|
||||
aaui.UI_Changes(oldprofile, newprofile, comments=True)
|
||||
|
||||
elif ans == 'CMD_VIEW_CHANGES_CLEAN':
|
||||
oldprofile = serialize_profile(split_to_merged(original_aa), which, {})
|
||||
newprofile = serialize_profile(split_to_merged(aa), which, {})
|
||||
oldprofile = serialize_profile(split_to_merged(original_aa), profile_name, {})
|
||||
newprofile = serialize_profile(split_to_merged(aa), profile_name, {})
|
||||
|
||||
aaui.UI_Changes(oldprofile, newprofile)
|
||||
|
||||
@@ -2166,7 +2155,6 @@ def write_profile_ui_feedback(profile, is_attachment=False):
|
||||
write_profile(profile, is_attachment)
|
||||
|
||||
def write_profile(profile, is_attachment=False):
|
||||
prof_filename = None
|
||||
if aa[profile][profile].get('filename', False):
|
||||
prof_filename = aa[profile][profile]['filename']
|
||||
elif is_attachment:
|
||||
@@ -2174,20 +2162,16 @@ def write_profile(profile, is_attachment=False):
|
||||
else:
|
||||
prof_filename = get_profile_filename_from_profile_name(profile, True)
|
||||
|
||||
newprof = tempfile.NamedTemporaryFile('w', suffix='~', delete=False, dir=profile_dir)
|
||||
if os.path.exists(prof_filename):
|
||||
shutil.copymode(prof_filename, newprof.name)
|
||||
else:
|
||||
#permission_600 = stat.S_IRUSR | stat.S_IWUSR # Owner read and write
|
||||
#os.chmod(newprof.name, permission_600)
|
||||
pass
|
||||
|
||||
serialize_options = {'METADATA': True, 'is_attachment': is_attachment}
|
||||
|
||||
profile_string = serialize_profile(split_to_merged(aa), profile, serialize_options)
|
||||
newprof.write(profile_string)
|
||||
newprof.close()
|
||||
|
||||
with NamedTemporaryFile('w', suffix='~', delete=False, dir=profile_dir) as newprof:
|
||||
if os.path.exists(prof_filename):
|
||||
shutil.copymode(prof_filename, newprof.name)
|
||||
else:
|
||||
# permission_600 = stat.S_IRUSR | stat.S_IWUSR # Owner read and write
|
||||
# os.chmod(newprof.name, permission_600)
|
||||
pass
|
||||
newprof.write(profile_string)
|
||||
os.rename(newprof.name, prof_filename)
|
||||
|
||||
if profile in changed:
|
||||
@@ -2385,10 +2369,7 @@ def check_qualifiers(program):
|
||||
|
||||
def get_subdirectories(current_dir):
|
||||
"""Returns a list of all directories directly inside given directory"""
|
||||
if sys.version_info < (3, 0):
|
||||
return os.walk(current_dir).next()[1]
|
||||
else:
|
||||
return os.walk(current_dir).__next__()[1]
|
||||
return next(os.walk(current_dir))[1]
|
||||
|
||||
def loadincludes():
|
||||
loadincludes_dir('tunables', True)
|
||||
|
@@ -14,9 +14,9 @@
|
||||
|
||||
import re
|
||||
|
||||
from apparmor.common import convert_regexp, type_is_str, AppArmorBug, AppArmorException
|
||||
from apparmor.common import convert_regexp, AppArmorBug, AppArmorException
|
||||
|
||||
class AARE(object):
|
||||
class AARE:
|
||||
'''AARE (AppArmor Regular Expression) wrapper class'''
|
||||
|
||||
def __init__(self, regex, is_path, log_event=None):
|
||||
@@ -68,7 +68,7 @@ class AARE(object):
|
||||
expression = expression.regex
|
||||
else:
|
||||
return self.is_equal(expression) # better safe than sorry
|
||||
elif not type_is_str(expression):
|
||||
elif type(expression) is not str:
|
||||
raise AppArmorBug('AARE.match() called with unknown object: %s' % str(expression))
|
||||
|
||||
if self._regex_compiled is None:
|
||||
@@ -81,7 +81,7 @@ class AARE(object):
|
||||
|
||||
if type(expression) == AARE:
|
||||
return self.regex == expression.regex
|
||||
elif type_is_str(expression):
|
||||
elif type(expression) is str:
|
||||
return self.regex == expression
|
||||
else:
|
||||
raise AppArmorBug('AARE.is_equal() called with unknown object: %s' % str(expression))
|
||||
|
@@ -14,7 +14,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
import apparmor.aa as apparmor
|
||||
|
||||
class Prof(object):
|
||||
class Prof:
|
||||
def __init__(self, filename):
|
||||
apparmor.init_aa()
|
||||
self.aa = apparmor.aa
|
||||
@@ -22,7 +22,7 @@ class Prof(object):
|
||||
self.include = apparmor.include
|
||||
self.filename = filename
|
||||
|
||||
class CleanProf(object):
|
||||
class CleanProf:
|
||||
def __init__(self, same_file, profile, other):
|
||||
#If same_file we're basically comparing the file against itself to check superfluous rules
|
||||
self.same_file = same_file
|
||||
|
@@ -9,8 +9,6 @@
|
||||
#
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
from __future__ import print_function
|
||||
import codecs
|
||||
import collections
|
||||
import glob
|
||||
import logging
|
||||
@@ -20,6 +18,8 @@ import subprocess
|
||||
import sys
|
||||
import termios
|
||||
import tty
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
import apparmor.rules as rules
|
||||
|
||||
DEBUGGING = False
|
||||
@@ -118,10 +118,7 @@ def cmd(command):
|
||||
except OSError as ex:
|
||||
return [127, str(ex)]
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
out = sp.communicate()[0].decode('ascii', 'ignore')
|
||||
else:
|
||||
out = sp.communicate()[0]
|
||||
out = sp.communicate()[0].decode('ascii', 'ignore')
|
||||
|
||||
return [sp.returncode, out]
|
||||
|
||||
@@ -134,10 +131,7 @@ def cmd_pipe(command1, command2):
|
||||
except OSError as ex:
|
||||
return [127, str(ex)]
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
out = sp2.communicate()[0].decode('ascii', 'ignore')
|
||||
else:
|
||||
out = sp2.communicate()[0]
|
||||
out = sp2.communicate()[0].decode('ascii', 'ignore')
|
||||
|
||||
return [sp2.returncode, out]
|
||||
|
||||
@@ -202,14 +196,7 @@ def open_file_anymode(mode, path, encoding='UTF-8'):
|
||||
# This avoids a crash when reading a logfile with special characters that
|
||||
# are not utf8-encoded (for example a latin1 "ö"), and also avoids crashes
|
||||
# at several other places we don't know yet ;-)
|
||||
errorhandling = 'surrogateescape'
|
||||
|
||||
if sys.version_info[0] < 3:
|
||||
errorhandling = 'replace'
|
||||
|
||||
orig = codecs.open(path, mode, encoding, errors=errorhandling)
|
||||
|
||||
return orig
|
||||
return open(path, mode, encoding=encoding, errors='surrogateescape')
|
||||
|
||||
def readkey():
|
||||
'''Returns the pressed key'''
|
||||
@@ -268,15 +255,6 @@ def user_perm(prof_dir):
|
||||
return False
|
||||
return True
|
||||
|
||||
if sys.version_info[0] > 2:
|
||||
unicode = str # python 3 dropped the unicode type. To keep type_is_str() simple (and pyflakes3 happy), re-create it as alias of str.
|
||||
|
||||
def type_is_str(var):
|
||||
''' returns True if the given variable is a str (or unicode string when using python 2)'''
|
||||
if type(var) in (str, unicode): # python 2 sometimes uses the 'unicode' type
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def split_name(full_profile):
|
||||
if '//' in full_profile:
|
||||
@@ -301,7 +279,7 @@ def combine_profname(name_parts):
|
||||
return '//'.join(name_parts)
|
||||
|
||||
|
||||
class DebugLogger(object):
|
||||
class DebugLogger:
|
||||
'''Unified debug facility. Logs to file or stderr.
|
||||
|
||||
Does not log anything by default. Will only log if environment variable
|
||||
@@ -336,8 +314,8 @@ class DebugLogger(object):
|
||||
format='%(asctime)s - %(name)s - %(message)s\n')
|
||||
except IOError:
|
||||
# Unable to open the default logfile, so create a temporary logfile and tell use about it
|
||||
import tempfile
|
||||
templog = tempfile.NamedTemporaryFile('w', prefix='apparmor', suffix='.log', delete=False)
|
||||
templog = NamedTemporaryFile('w', prefix='apparmor', suffix='.log', delete=False)
|
||||
templog.close()
|
||||
sys.stdout.write("\nCould not open: %s\nLogging to: %s\n" % (self.logfile, templog.name))
|
||||
|
||||
logging.basicConfig(filename=templog.name, level=self.debug_level,
|
||||
|
@@ -11,29 +11,12 @@
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# ----------------------------------------------------------------------
|
||||
from __future__ import with_statement
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import stat
|
||||
import sys
|
||||
import tempfile
|
||||
if sys.version_info < (3, 0):
|
||||
import ConfigParser as configparser
|
||||
|
||||
# Class to provide the object[section][option] behavior in Python2
|
||||
class configparser_py2(configparser.ConfigParser):
|
||||
def __getitem__(self, section):
|
||||
section_val = self.items(section)
|
||||
section_options = dict()
|
||||
for option, value in section_val:
|
||||
section_options[option] = value
|
||||
return section_options
|
||||
|
||||
|
||||
else:
|
||||
import configparser
|
||||
|
||||
from configparser import ConfigParser
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from apparmor.common import AppArmorException, open_file_read # , warn, msg,
|
||||
|
||||
@@ -41,7 +24,7 @@ from apparmor.common import AppArmorException, open_file_read # , warn, msg,
|
||||
# CFG = None
|
||||
# REPO_CFG = None
|
||||
# SHELL_FILES = ['easyprof.conf', 'notify.conf', 'parser.conf']
|
||||
class Config(object):
|
||||
class Config:
|
||||
def __init__(self, conf_type, conf_dir='/etc/apparmor'):
|
||||
self.CONF_DIR = conf_dir
|
||||
# The type of config file that'll be read and/or written
|
||||
@@ -55,7 +38,7 @@ class Config(object):
|
||||
if self.conf_type == 'shell':
|
||||
config = {'': dict()}
|
||||
elif self.conf_type == 'ini':
|
||||
config = configparser.ConfigParser()
|
||||
config = ConfigParser()
|
||||
return config
|
||||
|
||||
def read_config(self, filename):
|
||||
@@ -65,21 +48,10 @@ class Config(object):
|
||||
if self.conf_type == 'shell':
|
||||
config = self.read_shell(filepath)
|
||||
elif self.conf_type == 'ini':
|
||||
if sys.version_info > (3, 0):
|
||||
config = configparser.ConfigParser()
|
||||
else:
|
||||
config = configparser_py2()
|
||||
config = ConfigParser()
|
||||
# Set the option form to string -prevents forced conversion to lowercase
|
||||
config.optionxform = str
|
||||
if sys.version_info > (3, 0):
|
||||
config.read(filepath)
|
||||
else:
|
||||
try:
|
||||
config.read(filepath)
|
||||
except configparser.ParsingError:
|
||||
tmp_filepath = py2_parser(filepath)
|
||||
config.read(tmp_filepath.name)
|
||||
##config.__get__()
|
||||
config.read(filepath)
|
||||
return config
|
||||
|
||||
def write_config(self, filename, config):
|
||||
@@ -88,18 +60,17 @@ class Config(object):
|
||||
permission_600 = stat.S_IRUSR | stat.S_IWUSR # Owner read and write
|
||||
try:
|
||||
# Open a temporary file in the CONF_DIR to write the config file
|
||||
config_file = tempfile.NamedTemporaryFile('w', prefix='aa_temp', delete=False, dir=self.CONF_DIR)
|
||||
if os.path.exists(self.input_file):
|
||||
# Copy permissions from an existing file to temporary file
|
||||
shutil.copymode(self.input_file, config_file.name)
|
||||
else:
|
||||
# If no existing permission set the file permissions as 0600
|
||||
os.chmod(config_file.name, permission_600)
|
||||
if self.conf_type == 'shell':
|
||||
self.write_shell(filepath, config_file, config)
|
||||
elif self.conf_type == 'ini':
|
||||
self.write_configparser(filepath, config_file, config)
|
||||
config_file.close()
|
||||
with NamedTemporaryFile('w', prefix='aa_temp', delete=False, dir=self.CONF_DIR) as config_file:
|
||||
if os.path.exists(self.input_file):
|
||||
# Copy permissions from an existing file to temporary file
|
||||
shutil.copymode(self.input_file, config_file.name)
|
||||
else:
|
||||
# If no existing permission set the file permissions as 0600
|
||||
os.chmod(config_file.name, permission_600)
|
||||
if self.conf_type == 'shell':
|
||||
self.write_shell(filepath, config_file, config)
|
||||
elif self.conf_type == 'ini':
|
||||
self.write_configparser(filepath, config_file, config)
|
||||
except IOError:
|
||||
raise AppArmorException("Unable to write to %s" % filename)
|
||||
else:
|
||||
@@ -275,18 +246,3 @@ class Config(object):
|
||||
for option in options:
|
||||
line = ' ' + option + ' = ' + config[section][option] + '\n'
|
||||
f_out.write(line)
|
||||
|
||||
def py2_parser(filename):
|
||||
"""Returns the de-dented ini file from the new format ini"""
|
||||
tmp = tempfile.NamedTemporaryFile('rw')
|
||||
if os.path.exists(filename):
|
||||
with open(tmp.name, 'w') as f_out, open_file_read(filename) as f_in:
|
||||
for line in f_in:
|
||||
# The ini format allows for multi-line entries, with the subsequent
|
||||
# entries being indented deeper hence simple lstrip() is not appropriate
|
||||
if line[:2] == ' ':
|
||||
line = line[2:]
|
||||
elif line[0] == '\t':
|
||||
line = line[1:]
|
||||
f_out.write(line)
|
||||
return tmp
|
||||
|
@@ -8,22 +8,18 @@
|
||||
#
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
from __future__ import with_statement
|
||||
|
||||
import codecs
|
||||
import copy
|
||||
import glob
|
||||
import json
|
||||
import optparse
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from shutil import which
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from apparmor.aa import which
|
||||
from apparmor.common import AppArmorException
|
||||
from apparmor.common import AppArmorException, open_file_read
|
||||
|
||||
|
||||
DEBUGGING = False
|
||||
@@ -226,15 +222,6 @@ def get_directory_contents(path):
|
||||
files.sort()
|
||||
return files
|
||||
|
||||
def open_file_read(path):
|
||||
'''Open specified file read-only'''
|
||||
try:
|
||||
orig = codecs.open(path, 'r', "UTF-8")
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
return orig
|
||||
|
||||
|
||||
def verify_policy(policy, exe, base=None, include=None):
|
||||
'''Verify policy compiles'''
|
||||
@@ -242,16 +229,15 @@ def verify_policy(policy, exe, base=None, include=None):
|
||||
warn("Could not find apparmor_parser. Skipping verify")
|
||||
return True
|
||||
|
||||
fn = ""
|
||||
# if policy starts with '/' and is one line, assume it is a path
|
||||
if len(policy.splitlines()) == 1 and valid_path(policy):
|
||||
fn = policy
|
||||
else:
|
||||
f, fn = tempfile.mkstemp(prefix='aa-easyprof')
|
||||
if not isinstance(policy, bytes):
|
||||
policy = policy.encode('utf-8')
|
||||
os.write(f, policy)
|
||||
os.close(f)
|
||||
with NamedTemporaryFile('wb', prefix='aa-easyprof', delete=False) as f:
|
||||
fn = f.name
|
||||
if not isinstance(policy, bytes):
|
||||
policy = policy.encode('utf-8')
|
||||
f.write(policy)
|
||||
|
||||
command = [exe, '-QTK']
|
||||
if base:
|
||||
@@ -262,9 +248,7 @@ def verify_policy(policy, exe, base=None, include=None):
|
||||
|
||||
rc, out = cmd(command)
|
||||
os.unlink(fn)
|
||||
if rc == 0:
|
||||
return True
|
||||
return False
|
||||
return rc == 0
|
||||
|
||||
#
|
||||
# End utility functions
|
||||
@@ -385,19 +369,17 @@ class AppArmorEasyProfile:
|
||||
raise AppArmorException("Could not find '%s'" % self.conffile)
|
||||
|
||||
# Read in the configuration
|
||||
f = open_file_read(self.conffile)
|
||||
|
||||
pat = re.compile(r'^\w+=".*"?')
|
||||
for line in f:
|
||||
if not pat.search(line):
|
||||
continue
|
||||
if line.startswith("POLICYGROUPS_DIR="):
|
||||
d = re.split(r'=', line.strip())[1].strip('["\']')
|
||||
self.dirs['policygroups'] = d
|
||||
elif line.startswith("TEMPLATES_DIR="):
|
||||
d = re.split(r'=', line.strip())[1].strip('["\']')
|
||||
self.dirs['templates'] = d
|
||||
f.close()
|
||||
with open_file_read(self.conffile) as f:
|
||||
for line in f:
|
||||
if not pat.search(line):
|
||||
continue
|
||||
if line.startswith("POLICYGROUPS_DIR="):
|
||||
d = re.split(r'=', line.strip())[1].strip('["\']')
|
||||
self.dirs['policygroups'] = d
|
||||
elif line.startswith("TEMPLATES_DIR="):
|
||||
d = re.split(r'=', line.strip())[1].strip('["\']')
|
||||
self.dirs['templates'] = d
|
||||
|
||||
keys = self.dirs.keys()
|
||||
if 'templates' not in keys:
|
||||
@@ -693,13 +675,11 @@ class AppArmorEasyProfile:
|
||||
if not os.path.isdir(dir):
|
||||
raise AppArmorException("'%s' is not a directory" % dir)
|
||||
|
||||
f, fn = tempfile.mkstemp(prefix='aa-easyprof')
|
||||
if not isinstance(policy, bytes):
|
||||
policy = policy.encode('utf-8')
|
||||
os.write(f, policy)
|
||||
os.close(f)
|
||||
|
||||
shutil.move(fn, out_fn)
|
||||
with NamedTemporaryFile('wb', prefix='aa-easyprof', suffix='~', delete=False) as f:
|
||||
f.write(policy)
|
||||
os.rename(f.name, out_fn)
|
||||
|
||||
def gen_manifest(self, params):
|
||||
'''Take params list and output a JSON file'''
|
||||
|
@@ -8,13 +8,10 @@
|
||||
#
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
from __future__ import print_function # needed in py2 for print('...', file=sys.stderr)
|
||||
|
||||
import cgitb
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from apparmor.common import error
|
||||
|
||||
@@ -34,21 +31,18 @@ def handle_exception(*exc_info):
|
||||
print('', file=sys.stderr)
|
||||
error(ex.value)
|
||||
else:
|
||||
(fd, path) = tempfile.mkstemp(prefix='apparmor-bugreport-', suffix='.txt')
|
||||
file = os.fdopen(fd, 'w')
|
||||
#file = open_file_write(path) # writes everything converted to utf8 - not sure if we want this...
|
||||
with NamedTemporaryFile('w', prefix='apparmor-bugreport-', suffix='.txt', delete=False) as file:
|
||||
cgitb_hook = cgitb.Hook(display=1, file=file, format='text', context=10)
|
||||
cgitb_hook.handle(exc_info)
|
||||
|
||||
cgitb_hook = cgitb.Hook(display=1, file=file, format='text', context=10)
|
||||
cgitb_hook.handle(exc_info)
|
||||
|
||||
file.write('Please consider reporting a bug at https://gitlab.com/apparmor/apparmor/-/issues\n')
|
||||
file.write('and attach this file.\n')
|
||||
file.write('Please consider reporting a bug at https://gitlab.com/apparmor/apparmor/-/issues\n')
|
||||
file.write('and attach this file.\n')
|
||||
|
||||
print(''.join(traceback.format_exception(*exc_info)), file=sys.stderr)
|
||||
print('', file=sys.stderr)
|
||||
print('An unexpected error occurred!', file=sys.stderr)
|
||||
print('', file=sys.stderr)
|
||||
print('For details, see %s' % path, file=sys.stderr)
|
||||
print('For details, see %s' % file.name, file=sys.stderr)
|
||||
print('Please consider reporting a bug at https://gitlab.com/apparmor/apparmor/-/issues', file=sys.stderr)
|
||||
print('and attach this file.', file=sys.stderr)
|
||||
|
||||
|
@@ -79,9 +79,6 @@ class ReadLog:
|
||||
"""Parse the event from log into key value pairs"""
|
||||
msg = msg.strip()
|
||||
self.debug_logger.info('parse_event: %s' % msg)
|
||||
if sys.version_info < (3, 0):
|
||||
# parse_record fails with u'foo' style strings hence typecasting to string
|
||||
msg = str(msg)
|
||||
event = LibAppArmor.parse_record(msg)
|
||||
ev = dict()
|
||||
ev['resource'] = event.info
|
||||
@@ -266,32 +263,31 @@ class ReadLog:
|
||||
self.LOG = open_file_read(self.filename)
|
||||
except IOError:
|
||||
raise AppArmorException('Can not read AppArmor logfile: ' + self.filename)
|
||||
line = True
|
||||
while line:
|
||||
line = self.get_next_log_entry()
|
||||
if not line:
|
||||
break
|
||||
line = line.strip()
|
||||
self.debug_logger.debug('read_log: %s' % line)
|
||||
if self.logmark in line:
|
||||
seenmark = True
|
||||
with self.LOG:
|
||||
line = True
|
||||
while line:
|
||||
line = self.get_next_log_entry()
|
||||
if not line:
|
||||
break
|
||||
line = line.strip()
|
||||
self.debug_logger.debug('read_log: %s' % line)
|
||||
if self.logmark in line:
|
||||
seenmark = True
|
||||
|
||||
self.debug_logger.debug('read_log: seenmark = %s' % seenmark)
|
||||
if not seenmark:
|
||||
continue
|
||||
self.debug_logger.debug('read_log: seenmark = %s' % seenmark)
|
||||
if not seenmark:
|
||||
continue
|
||||
|
||||
event = self.parse_event(line)
|
||||
if event:
|
||||
try:
|
||||
self.parse_event_for_tree(event)
|
||||
event = self.parse_event(line)
|
||||
if event:
|
||||
try:
|
||||
self.parse_event_for_tree(event)
|
||||
|
||||
except AppArmorException as e:
|
||||
ex_msg = ('%(msg)s\n\nThis error was caused by the log line:\n%(logline)s' %
|
||||
{'msg': e.value, 'logline': line})
|
||||
# when py3 only: Drop the original AppArmorException by passing None as the parent exception
|
||||
raise AppArmorBug(ex_msg) # py3-only: from None
|
||||
except AppArmorException as e:
|
||||
ex_msg = ('%(msg)s\n\nThis error was caused by the log line:\n%(logline)s' %
|
||||
{'msg': e.value, 'logline': line})
|
||||
raise AppArmorBug(ex_msg) from None
|
||||
|
||||
self.LOG.close()
|
||||
self.logmark = ''
|
||||
|
||||
return self.hashlog
|
||||
|
@@ -14,7 +14,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
|
||||
from apparmor.rule.abi import AbiRule, AbiRuleset
|
||||
from apparmor.rule.capability import CapabilityRule, CapabilityRuleset
|
||||
@@ -108,14 +108,14 @@ class ProfileStorage:
|
||||
|
||||
# allow writing str or None to some keys
|
||||
elif key in ('flags', 'filename'):
|
||||
if type_is_str(value) or value is None:
|
||||
if type(value) is str or value is None:
|
||||
self.data[key] = value
|
||||
else:
|
||||
raise AppArmorBug('Attempt to change type of "%s" from %s to %s, value %s' % (key, type(self.data[key]), type(value), value))
|
||||
|
||||
# allow writing str values
|
||||
elif type_is_str(self.data[key]):
|
||||
if type_is_str(value):
|
||||
elif type(self.data[key]) is str:
|
||||
if type(value) is str:
|
||||
self.data[key] = value
|
||||
else:
|
||||
raise AppArmorBug('Attempt to change type of "%s" from %s to %s, value %s' % (key, type(self.data[key]), type(value), value))
|
||||
@@ -268,10 +268,10 @@ def split_flags(flags):
|
||||
def add_or_remove_flag(flags, flags_to_change, set_flag):
|
||||
'''add (if set_flag == True) or remove the given flags_to_change to flags'''
|
||||
|
||||
if type_is_str(flags) or flags is None:
|
||||
if type(flags) is str or flags is None:
|
||||
flags = split_flags(flags)
|
||||
|
||||
if type_is_str(flags_to_change) or flags_to_change is None:
|
||||
if type(flags_to_change) is str or flags_to_change is None:
|
||||
flags_to_change = split_flags(flags_to_change)
|
||||
|
||||
if set_flag:
|
||||
|
@@ -13,15 +13,17 @@
|
||||
#
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
from abc import abstractmethod
|
||||
|
||||
from apparmor.aare import AARE
|
||||
from apparmor.common import AppArmorBug, type_is_str
|
||||
from apparmor.common import AppArmorBug
|
||||
|
||||
# setup module translations
|
||||
from apparmor.translations import init_translation
|
||||
_ = init_translation()
|
||||
|
||||
|
||||
class BaseRule(object):
|
||||
class BaseRule:
|
||||
'''Base class to handle and store a single rule'''
|
||||
|
||||
# type specific rules should inherit from this class.
|
||||
@@ -76,7 +78,7 @@ class BaseRule(object):
|
||||
|
||||
if rulepart == self.ALL:
|
||||
return None, True
|
||||
elif type_is_str(rulepart):
|
||||
elif type(rulepart) is str:
|
||||
if len(rulepart.strip()) == 0:
|
||||
raise AppArmorBug('Passed empty %(partname)s to %(classname)s: %(rulepart)s' %
|
||||
{'partname': partname, 'classname': self.__class__.__name__, 'rulepart': str(rulepart)})
|
||||
@@ -104,8 +106,8 @@ class BaseRule(object):
|
||||
else:
|
||||
return False
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def _match(cls, raw_rule):
|
||||
'''parse raw_rule and return regex match object'''
|
||||
raise NotImplementedError("'%s' needs to implement _match(), but didn't" % (str(cls)))
|
||||
@@ -117,14 +119,14 @@ class BaseRule(object):
|
||||
rule.raw_rule = raw_rule.strip()
|
||||
return rule
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def _parse(cls, raw_rule):
|
||||
'''returns a Rule object created from parsing the raw rule.
|
||||
required to be implemented by subclasses; raise exception if not'''
|
||||
raise NotImplementedError("'%s' needs to implement _parse(), but didn't" % (str(cls)))
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def get_clean(self, depth=0):
|
||||
'''return clean rule (with default formatting, and leading whitespace as specified in the depth parameter)'''
|
||||
raise NotImplementedError("'%s' needs to implement get_clean(), but didn't" % (str(self.__class__)))
|
||||
@@ -157,7 +159,7 @@ class BaseRule(object):
|
||||
# still here? -> then the common part is covered, check rule-specific things now
|
||||
return self.is_covered_localvars(other_rule)
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def is_covered_localvars(self, other_rule):
|
||||
'''check if the rule-specific parts of other_rule is covered by this rule object'''
|
||||
raise NotImplementedError("'%s' needs to implement is_covered_localvars(), but didn't" % (str(self)))
|
||||
@@ -238,7 +240,7 @@ class BaseRule(object):
|
||||
# still here? -> then it is equal
|
||||
return True
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def is_equal_localvars(self, other_rule, strict):
|
||||
'''compare if rule-specific variables are equal'''
|
||||
raise NotImplementedError("'%s' needs to implement is_equal_localvars(), but didn't" % (str(self)))
|
||||
@@ -273,24 +275,24 @@ class BaseRule(object):
|
||||
|
||||
return headers
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def logprof_header_localvars(self):
|
||||
'''return the headers (human-readable version of the rule) to display in aa-logprof for this rule object
|
||||
returns {'label1': 'value1', 'label2': 'value2'} '''
|
||||
raise NotImplementedError("'%s' needs to implement logprof_header(), but didn't" % (str(self)))
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def edit_header(self):
|
||||
'''return the prompt for, and the path to edit when using '(N)ew' '''
|
||||
raise NotImplementedError("'%s' needs to implement edit_header(), but didn't" % (str(self)))
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def validate_edit(self, newpath):
|
||||
'''validate the new path.
|
||||
Returns True if it covers the previous path, False if it doesn't.'''
|
||||
raise NotImplementedError("'%s' needs to implement validate_edit(), but didn't" % (str(self)))
|
||||
|
||||
# @abstractmethod FIXME - uncomment when python3 only
|
||||
@abstractmethod
|
||||
def store_edit(self, newpath):
|
||||
'''store the changed path.
|
||||
This is done even if the new path doesn't match the original one.'''
|
||||
@@ -314,7 +316,7 @@ class BaseRule(object):
|
||||
return '%s%s' % (auditstr, allowstr)
|
||||
|
||||
|
||||
class BaseRuleset(object):
|
||||
class BaseRuleset:
|
||||
'''Base class to handle and store a collection of rules'''
|
||||
|
||||
# decides if the (G)lob and Glob w/ (E)xt options are displayed
|
||||
@@ -499,7 +501,7 @@ def check_and_split_list(lst, allowed_keywords, all_obj, classname, keyword_name
|
||||
|
||||
if lst == all_obj:
|
||||
return None, True, None
|
||||
elif type_is_str(lst):
|
||||
elif type(lst) is str:
|
||||
result_list = {lst}
|
||||
elif type(lst) in (list, tuple, set) and (len(lst) > 0 or allow_empty_list):
|
||||
result_list = set(lst)
|
||||
|
@@ -30,10 +30,9 @@ class AbiRule(IncludeRule):
|
||||
def __init__(self, path, ifexists, ismagic, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(AbiRule, self).__init__(path, ifexists, ismagic,
|
||||
audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(path, ifexists, ismagic,
|
||||
audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
# abi doesn't support 'if exists'
|
||||
if ifexists:
|
||||
|
@@ -13,7 +13,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
from apparmor.regex import RE_PROFILE_ALIAS, strip_quotes
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_comment, quote_if_needed
|
||||
|
||||
# setup module translations
|
||||
@@ -29,10 +29,8 @@ class AliasRule(BaseRule):
|
||||
def __init__(self, orig_path, target, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(AliasRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
# aliass don't support audit or deny
|
||||
if audit:
|
||||
@@ -40,14 +38,14 @@ class AliasRule(BaseRule):
|
||||
if deny:
|
||||
raise AppArmorBug('Attempt to initialize %s with deny flag' % self.__class__.__name__)
|
||||
|
||||
if not type_is_str(orig_path):
|
||||
if type(orig_path) is not str:
|
||||
raise AppArmorBug('Passed unknown type for orig_path to %s: %s' % (self.__class__.__name__, orig_path))
|
||||
if not orig_path:
|
||||
raise AppArmorException('Passed empty orig_path to %s: %s' % (self.__class__.__name__, orig_path))
|
||||
if not orig_path.startswith('/'):
|
||||
raise AppArmorException("Alias path doesn't start with '/'")
|
||||
|
||||
if not type_is_str(target):
|
||||
if type(target) is not str:
|
||||
raise AppArmorBug('Passed unknown type for target to %s: %s' % (self.__class__.__name__, target))
|
||||
if not target:
|
||||
raise AppArmorException('Passed empty target to %s: %s' % (self.__class__.__name__, target))
|
||||
|
@@ -14,7 +14,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
from apparmor.regex import RE_PROFILE_BOOLEAN
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_comment
|
||||
|
||||
# setup module translations
|
||||
@@ -30,10 +30,8 @@ class BooleanRule(BaseRule):
|
||||
def __init__(self, varname, value, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(BooleanRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
# boolean variables don't support audit or deny
|
||||
if audit:
|
||||
@@ -41,12 +39,12 @@ class BooleanRule(BaseRule):
|
||||
if deny:
|
||||
raise AppArmorBug('Attempt to initialize %s with deny flag' % self.__class__.__name__)
|
||||
|
||||
if not type_is_str(varname):
|
||||
if type(varname) is not str:
|
||||
raise AppArmorBug('Passed unknown type for boolean variable to %s: %s' % (self.__class__.__name__, varname))
|
||||
if not varname.startswith('$'):
|
||||
raise AppArmorException("Passed invalid boolean to %s (doesn't start with '$'): %s" % (self.__class__.__name__, varname))
|
||||
|
||||
if not type_is_str(value):
|
||||
if type(value) is not str:
|
||||
raise AppArmorBug('Passed unknown type for value to %s: %s' % (self.__class__.__name__, value))
|
||||
if not value:
|
||||
raise AppArmorException('Passed empty value to %s: %s' % (self.__class__.__name__, value))
|
||||
@@ -131,4 +129,4 @@ class BooleanRuleset(BaseRuleset):
|
||||
if rule.varname == knownrule.varname:
|
||||
raise AppArmorException(_('Redefining existing variable %(variable)s: %(value)s') % { 'variable': rule.varname, 'value': rule.value })
|
||||
|
||||
super(BooleanRuleset, self).add(rule, cleanup)
|
||||
super().add(rule, cleanup)
|
||||
|
@@ -16,7 +16,7 @@
|
||||
import re
|
||||
|
||||
from apparmor.regex import RE_PROFILE_CAP
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, logprof_value_or_all, parse_modifiers
|
||||
|
||||
# setup module translations
|
||||
@@ -29,7 +29,7 @@ class CapabilityRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field CapabilityRule.ALL
|
||||
class __CapabilityAll(object):
|
||||
class __CapabilityAll:
|
||||
pass
|
||||
|
||||
ALL = __CapabilityAll
|
||||
@@ -39,10 +39,8 @@ class CapabilityRule(BaseRule):
|
||||
def __init__(self, cap_list, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(CapabilityRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
# Because we support having multiple caps in one rule,
|
||||
# initializer needs to accept a list of caps.
|
||||
self.all_caps = False
|
||||
@@ -50,7 +48,7 @@ class CapabilityRule(BaseRule):
|
||||
self.all_caps = True
|
||||
self.capability = set()
|
||||
else:
|
||||
if type_is_str(cap_list):
|
||||
if type(cap_list) is str:
|
||||
self.capability = {cap_list}
|
||||
elif type(cap_list) == list and len(cap_list) > 0:
|
||||
self.capability = set(cap_list)
|
||||
|
@@ -14,7 +14,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
from apparmor.regex import RE_PROFILE_CHANGE_PROFILE, strip_quotes
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_modifiers, logprof_value_or_all, quote_if_needed
|
||||
|
||||
# setup module translations
|
||||
@@ -27,7 +27,7 @@ class ChangeProfileRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field ChangeProfileRule.ALL
|
||||
class __ChangeProfileAll(object):
|
||||
class __ChangeProfileAll:
|
||||
pass
|
||||
|
||||
ALL = __ChangeProfileAll
|
||||
@@ -43,10 +43,8 @@ class ChangeProfileRule(BaseRule):
|
||||
CHANGE_PROFILE RULE = 'change_profile' [ [ EXEC MODE ] EXEC COND ] [ -> PROGRAMCHILD ]
|
||||
'''
|
||||
|
||||
super(ChangeProfileRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
if execmode:
|
||||
if execmode != 'safe' and execmode != 'unsafe':
|
||||
@@ -59,7 +57,7 @@ class ChangeProfileRule(BaseRule):
|
||||
self.all_execconds = False
|
||||
if execcond == ChangeProfileRule.ALL:
|
||||
self.all_execconds = True
|
||||
elif type_is_str(execcond):
|
||||
elif type(execcond) is str:
|
||||
if not execcond.strip():
|
||||
raise AppArmorBug('Empty exec condition in change_profile rule')
|
||||
elif execcond.startswith('/') or execcond.startswith('@'):
|
||||
@@ -73,7 +71,7 @@ class ChangeProfileRule(BaseRule):
|
||||
self.all_targetprofiles = False
|
||||
if targetprofile == ChangeProfileRule.ALL:
|
||||
self.all_targetprofiles = True
|
||||
elif type_is_str(targetprofile):
|
||||
elif type(targetprofile) is str:
|
||||
if targetprofile.strip():
|
||||
self.targetprofile = targetprofile
|
||||
else:
|
||||
|
@@ -68,7 +68,7 @@ class DbusRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field DbusRule.ALL
|
||||
class __DbusAll(object):
|
||||
class __DbusAll:
|
||||
pass
|
||||
|
||||
ALL = __DbusAll
|
||||
@@ -78,10 +78,8 @@ class DbusRule(BaseRule):
|
||||
def __init__(self, access, bus, path, name, interface, member, peername, peerlabel,
|
||||
audit=False, deny=False, allow_keyword=False, comment='', log_event=None):
|
||||
|
||||
super(DbusRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
self.access, self.all_access, unknown_items = check_and_split_list(access, access_keywords, DbusRule.ALL, 'DbusRule', 'access')
|
||||
if unknown_items:
|
||||
|
@@ -14,7 +14,7 @@
|
||||
|
||||
from apparmor.aare import AARE
|
||||
from apparmor.regex import RE_PROFILE_FILE_ENTRY, strip_quotes
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, check_and_split_list, logprof_value_or_all, parse_modifiers, quote_if_needed
|
||||
|
||||
# setup module translations
|
||||
@@ -34,9 +34,9 @@ class FileRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field FileRule.ALL
|
||||
class __FileAll(object):
|
||||
class __FileAll:
|
||||
pass
|
||||
class __FileAnyExec(object):
|
||||
class __FileAnyExec:
|
||||
pass
|
||||
|
||||
ALL = __FileAll
|
||||
@@ -58,8 +58,8 @@ class FileRule(BaseRule):
|
||||
- leading_perms: bool
|
||||
'''
|
||||
|
||||
super(FileRule, self).__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
# rulepart partperms is_path log_event
|
||||
self.path, self.all_paths = self._aare_or_all(path, 'path', True, log_event)
|
||||
@@ -69,7 +69,7 @@ class FileRule(BaseRule):
|
||||
self.can_glob_ext = not self.all_paths
|
||||
self.can_edit = not self.all_paths
|
||||
|
||||
if type_is_str(perms):
|
||||
if type(perms) is str:
|
||||
perms, tmp_exec_perms = split_perms(perms, deny)
|
||||
if tmp_exec_perms:
|
||||
raise AppArmorBug('perms must not contain exec perms')
|
||||
@@ -96,7 +96,7 @@ class FileRule(BaseRule):
|
||||
raise AppArmorBug("link rules can't have execute permissions")
|
||||
elif exec_perms == self.ANY_EXEC:
|
||||
self.exec_perms = exec_perms
|
||||
elif type_is_str(exec_perms):
|
||||
elif type(exec_perms) is str:
|
||||
if deny:
|
||||
if exec_perms != 'x':
|
||||
raise AppArmorException(_("file deny rules only allow to use 'x' as execute mode, but not %s" % exec_perms))
|
||||
|
@@ -13,7 +13,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
from apparmor.regex import RE_INCLUDE, re_match_include_parse
|
||||
from apparmor.common import AppArmorBug, AppArmorException, is_skippable_file, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException, is_skippable_file
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_comment
|
||||
import os
|
||||
|
||||
@@ -30,10 +30,8 @@ class IncludeRule(BaseRule):
|
||||
def __init__(self, path, ifexists, ismagic, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(IncludeRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
# include doesn't support audit or deny
|
||||
if audit:
|
||||
@@ -45,7 +43,7 @@ class IncludeRule(BaseRule):
|
||||
raise AppArmorBug('Passed unknown type for ifexists to %s: %s' % (self.__class__.__name__, ifexists))
|
||||
if type(ismagic) is not bool:
|
||||
raise AppArmorBug('Passed unknown type for ismagic to %s: %s' % (self.__class__.__name__, ismagic))
|
||||
if not type_is_str(path):
|
||||
if type(path) is not str:
|
||||
raise AppArmorBug('Passed unknown type for path to %s: %s' % (self.__class__.__name__, path))
|
||||
if not path:
|
||||
raise AppArmorBug('Passed empty path to %s: %s' % (self.__class__.__name__, path))
|
||||
|
@@ -16,7 +16,7 @@
|
||||
import re
|
||||
|
||||
from apparmor.regex import RE_PROFILE_NETWORK
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, logprof_value_or_all, parse_modifiers
|
||||
|
||||
# setup module translations
|
||||
@@ -49,7 +49,7 @@ class NetworkRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field NetworkRule.ALL
|
||||
class __NetworkAll(object):
|
||||
class __NetworkAll:
|
||||
pass
|
||||
|
||||
ALL = __NetworkAll
|
||||
@@ -59,16 +59,14 @@ class NetworkRule(BaseRule):
|
||||
def __init__(self, domain, type_or_protocol, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(NetworkRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
self.domain = None
|
||||
self.all_domains = False
|
||||
if domain == NetworkRule.ALL:
|
||||
self.all_domains = True
|
||||
elif type_is_str(domain):
|
||||
elif type(domain) is str:
|
||||
if domain in network_domain_keywords:
|
||||
self.domain = domain
|
||||
else:
|
||||
@@ -80,7 +78,7 @@ class NetworkRule(BaseRule):
|
||||
self.all_type_or_protocols = False
|
||||
if type_or_protocol == NetworkRule.ALL:
|
||||
self.all_type_or_protocols = True
|
||||
elif type_is_str(type_or_protocol):
|
||||
elif type(type_or_protocol) is str:
|
||||
if type_or_protocol in network_protocol_keywords:
|
||||
self.type_or_protocol = type_or_protocol
|
||||
elif type_or_protocol in network_type_keywords:
|
||||
|
@@ -45,7 +45,7 @@ class PtraceRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field PtraceRule.ALL
|
||||
class __PtraceAll(object):
|
||||
class __PtraceAll:
|
||||
pass
|
||||
|
||||
ALL = __PtraceAll
|
||||
@@ -55,10 +55,8 @@ class PtraceRule(BaseRule):
|
||||
def __init__(self, access, peer, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(PtraceRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
self.access, self.all_access, unknown_items = check_and_split_list(access, access_keywords, PtraceRule.ALL, 'PtraceRule', 'access')
|
||||
if unknown_items:
|
||||
|
@@ -16,7 +16,7 @@
|
||||
import re
|
||||
|
||||
from apparmor.regex import RE_PROFILE_RLIMIT, strip_quotes
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_comment, quote_if_needed
|
||||
|
||||
# setup module translations
|
||||
@@ -41,7 +41,7 @@ class RlimitRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field RlimitRule.ALL
|
||||
class __RlimitAll(object):
|
||||
class __RlimitAll:
|
||||
pass
|
||||
|
||||
ALL = __RlimitAll
|
||||
@@ -51,15 +51,13 @@ class RlimitRule(BaseRule):
|
||||
def __init__(self, rlimit, value, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(RlimitRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
if audit or deny or allow_keyword:
|
||||
raise AppArmorBug('The audit, allow or deny keywords are not allowed in rlimit rules.')
|
||||
|
||||
if type_is_str(rlimit):
|
||||
if type(rlimit) is str:
|
||||
if rlimit in rlimit_all:
|
||||
self.rlimit = rlimit
|
||||
else:
|
||||
@@ -72,7 +70,7 @@ class RlimitRule(BaseRule):
|
||||
self.all_values = False
|
||||
if value == RlimitRule.ALL:
|
||||
self.all_values = True
|
||||
elif type_is_str(value):
|
||||
elif type(value) is str:
|
||||
if not value.strip():
|
||||
raise AppArmorBug('Empty value in rlimit rule')
|
||||
|
||||
|
@@ -66,7 +66,7 @@ class SignalRule(BaseRule):
|
||||
|
||||
# Nothing external should reference this class, all external users
|
||||
# should reference the class field SignalRule.ALL
|
||||
class __SignalAll(object):
|
||||
class __SignalAll:
|
||||
pass
|
||||
|
||||
ALL = __SignalAll
|
||||
@@ -76,10 +76,8 @@ class SignalRule(BaseRule):
|
||||
def __init__(self, access, signal, peer, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(SignalRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
self.access, self.all_access, unknown_items = check_and_split_list(access, access_keywords, SignalRule.ALL, 'SignalRule', 'access')
|
||||
if unknown_items:
|
||||
|
@@ -14,7 +14,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
from apparmor.regex import RE_PROFILE_VARIABLE, strip_quotes
|
||||
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_comment, quote_if_needed
|
||||
|
||||
import re
|
||||
@@ -32,10 +32,8 @@ class VariableRule(BaseRule):
|
||||
def __init__(self, varname, mode, values, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
|
||||
super(VariableRule, self).__init__(audit=audit, deny=deny,
|
||||
allow_keyword=allow_keyword,
|
||||
comment=comment,
|
||||
log_event=log_event)
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
# variables don't support audit or deny
|
||||
if audit:
|
||||
@@ -43,14 +41,14 @@ class VariableRule(BaseRule):
|
||||
if deny:
|
||||
raise AppArmorBug('Attempt to initialize %s with deny flag' % self.__class__.__name__)
|
||||
|
||||
if not type_is_str(varname):
|
||||
if type(varname) is not str:
|
||||
raise AppArmorBug('Passed unknown type for varname to %s: %s' % (self.__class__.__name__, varname))
|
||||
if not varname.startswith('@{'):
|
||||
raise AppArmorException("Passed invalid varname to %s (doesn't start with '@{'): %s" % (self.__class__.__name__, varname))
|
||||
if not varname.endswith('}'):
|
||||
raise AppArmorException("Passed invalid varname to %s (doesn't end with '}'): %s" % (self.__class__.__name__, varname))
|
||||
|
||||
if not type_is_str(mode):
|
||||
if type(mode) is not str:
|
||||
raise AppArmorBug('Passed unknown type for variable assignment mode to %s: %s' % (self.__class__.__name__, mode))
|
||||
if mode not in ('=', '+='):
|
||||
raise AppArmorBug('Passed unknown variable assignment mode to %s: %s' % (self.__class__.__name__, mode))
|
||||
@@ -151,7 +149,7 @@ class VariableRuleset(BaseRuleset):
|
||||
if rule.varname == knownrule.varname:
|
||||
raise AppArmorException(_('Redefining existing variable %(variable)s: %(value)s') % { 'variable': rule.varname, 'value': rule.values })
|
||||
|
||||
super(VariableRuleset, self).add(rule, cleanup)
|
||||
super().add(rule, cleanup)
|
||||
|
||||
def get_merged_variables(self):
|
||||
''' Get merged variables of this VariableRuleset.
|
||||
|
@@ -8,7 +8,7 @@
|
||||
#
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
class _Raw_Rule(object):
|
||||
class _Raw_Rule:
|
||||
audit = False
|
||||
deny = False
|
||||
|
||||
|
@@ -9,7 +9,6 @@
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
from apparmor.common import AppArmorException, debug, error, msg, cmd
|
||||
from apparmor.aa import which
|
||||
import apparmor.easyprof
|
||||
import optparse
|
||||
import os
|
||||
@@ -18,8 +17,10 @@ import re
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from shutil import which
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
|
||||
def check_requirements(binary):
|
||||
'''Verify necessary software is installed'''
|
||||
@@ -128,11 +129,8 @@ def aa_exec(command, opt, environ={}, verify_rules=[]):
|
||||
policy = easyp.gen_policy(**params)
|
||||
debug("\n%s" % policy)
|
||||
|
||||
with tempfile.NamedTemporaryFile(prefix='%s-' % policy_name) as tmp:
|
||||
if sys.version_info[0] >= 3:
|
||||
tmp.write(bytes(policy, 'utf-8'))
|
||||
else:
|
||||
tmp.write(policy)
|
||||
with NamedTemporaryFile(prefix='%s-' % policy_name) as tmp:
|
||||
tmp.write(policy.encode('utf-8'))
|
||||
|
||||
debug("using '%s' template" % opt.template)
|
||||
# TODO: get rid of this
|
||||
@@ -540,14 +538,9 @@ Section "ServerLayout"
|
||||
InputDevice "NoKeyboard"
|
||||
EndSection
|
||||
'''
|
||||
|
||||
tmp, xorg_conf = tempfile.mkstemp(prefix='aa-sandbox-xorg.conf-')
|
||||
self.tempfiles.append(xorg_conf)
|
||||
if sys.version_info[0] >= 3:
|
||||
os.write(tmp, bytes(conf, 'utf-8'))
|
||||
else:
|
||||
os.write(tmp, conf)
|
||||
os.close(tmp)
|
||||
with NamedTemporaryFile('wb', prefix='aa-sandbox-xorg.conf-', delete=False) as tmp:
|
||||
self.tempfiles.append(tmp.name)
|
||||
tmp.write(conf.encode('utf-8'))
|
||||
|
||||
xvfb_args.append('--xvfb=Xorg')
|
||||
xvfb_args.append('-dpi 96') # https://www.xpra.org/trac/ticket/163
|
||||
@@ -555,10 +548,10 @@ EndSection
|
||||
xvfb_args.append('-noreset')
|
||||
xvfb_args.append('-logfile %s' % os.path.expanduser('~/.xpra/%s.log' % self.display))
|
||||
xvfb_args.append('-auth %s' % self.new_environ['XAUTHORITY'])
|
||||
xvfb_args.append('-config %s' % xorg_conf)
|
||||
extensions = ['Composite', 'GLX', 'RANDR', 'RENDER', 'SECURITY']
|
||||
for i in extensions:
|
||||
xvfb_args.append('+extension %s' % i)
|
||||
xvfb_args.append('-config %s' % tmp.name)
|
||||
xvfb_args.extend(
|
||||
'+extension %s' % i for i in ('Composite', 'GLX', 'RANDR', 'RENDER', 'SECURITY')
|
||||
)
|
||||
else:
|
||||
raise AppArmorException("Unsupported X driver '%s'" % self.driver)
|
||||
|
||||
|
@@ -11,11 +11,10 @@
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# ----------------------------------------------------------------------
|
||||
from __future__ import with_statement
|
||||
import re
|
||||
from apparmor.common import AppArmorException, open_file_read, warn, convert_regexp # , msg, error, debug
|
||||
|
||||
class Severity(object):
|
||||
class Severity:
|
||||
def __init__(self, dbname=None, default_rank=10):
|
||||
"""Initialises the class object"""
|
||||
self.PROF_DIR = '/etc/apparmor.d' # The profile directory
|
||||
|
@@ -14,6 +14,7 @@
|
||||
# ----------------------------------------------------------------------
|
||||
import os
|
||||
import sys
|
||||
from shutil import which
|
||||
|
||||
import apparmor.aa as apparmor
|
||||
import apparmor.ui as aaui
|
||||
@@ -61,9 +62,9 @@ class aa_tools:
|
||||
program = fq_path
|
||||
profile = apparmor.get_profile_filename_from_attachment(fq_path, True)
|
||||
else:
|
||||
which = apparmor.which(p)
|
||||
if which is not None:
|
||||
program = apparmor.get_full_path(which)
|
||||
which_ = which(p)
|
||||
if which_ is not None:
|
||||
program = apparmor.get_full_path(which_)
|
||||
profile = apparmor.get_profile_filename_from_attachment(program, True)
|
||||
elif os.path.exists(os.path.join(apparmor.profile_dir, p)):
|
||||
program = None
|
||||
|
@@ -19,8 +19,8 @@ import sys
|
||||
import re
|
||||
import readline
|
||||
import os
|
||||
import tempfile
|
||||
import subprocess
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from apparmor.common import readkey, AppArmorException, DebugLogger
|
||||
|
||||
@@ -31,10 +31,6 @@ _ = init_translation()
|
||||
# Set up UI logger for separate messages from UI module
|
||||
debug_logger = DebugLogger('UI')
|
||||
|
||||
# If Python3, wrap input in raw_input so make check passes
|
||||
if 'raw_input' not in dir(__builtins__):
|
||||
raw_input = input
|
||||
|
||||
ARROWS = {'A': 'UP', 'B': 'DOWN', 'C': 'RIGHT', 'D': 'LEFT'}
|
||||
|
||||
UI_mode = 'text'
|
||||
@@ -70,7 +66,7 @@ def set_text_mode():
|
||||
# reads the response on command line for json and verifies the response
|
||||
# for the dialog type
|
||||
def json_response(dialog_type):
|
||||
string = raw_input('\n')
|
||||
string = input('\n')
|
||||
rh = json.loads(string.strip())
|
||||
if rh["dialog"] != dialog_type:
|
||||
raise AppArmorException('Expected response %s got %s.' % (dialog_type, string))
|
||||
@@ -222,7 +218,7 @@ def UI_GetString(text, default):
|
||||
else: # text mode
|
||||
readline.set_startup_hook(lambda: readline.insert_text(default))
|
||||
try:
|
||||
string = raw_input('\n' + text)
|
||||
string = input('\n' + text)
|
||||
except EOFError:
|
||||
string = ''
|
||||
finally:
|
||||
@@ -253,34 +249,29 @@ def UI_BusyStop():
|
||||
|
||||
|
||||
def diff(oldprofile, newprofile):
|
||||
difftemp = tempfile.NamedTemporaryFile('w')
|
||||
difftemp = NamedTemporaryFile('w')
|
||||
subprocess.call('diff -u -p %s %s > %s' % (oldprofile, newprofile, difftemp.name), shell=True)
|
||||
return difftemp
|
||||
|
||||
|
||||
def write_profile_to_tempfile(profile):
|
||||
temp = tempfile.NamedTemporaryFile('w')
|
||||
temp = NamedTemporaryFile('w')
|
||||
temp.write(profile)
|
||||
temp.flush()
|
||||
return temp
|
||||
|
||||
|
||||
def generate_diff(oldprofile, newprofile):
|
||||
oldtemp = write_profile_to_tempfile(oldprofile)
|
||||
newtemp = write_profile_to_tempfile(newprofile)
|
||||
difftemp = diff(oldtemp.name, newtemp.name)
|
||||
oldtemp.close()
|
||||
newtemp.close()
|
||||
return difftemp
|
||||
with write_profile_to_tempfile(oldprofile) as oldtemp, \
|
||||
write_profile_to_tempfile(newprofile) as newtemp:
|
||||
return diff(oldtemp.name, newtemp.name)
|
||||
|
||||
|
||||
def generate_diff_with_comments(oldprofile, newprofile):
|
||||
if not os.path.exists(oldprofile):
|
||||
raise AppArmorException(_("Can't find existing profile %s to compare changes.") % oldprofile)
|
||||
newtemp = write_profile_to_tempfile(newprofile)
|
||||
difftemp = diff(oldprofile, newtemp.name)
|
||||
newtemp.close()
|
||||
return difftemp
|
||||
with write_profile_to_tempfile(newprofile) as newtemp:
|
||||
return diff(oldprofile, newtemp.name)
|
||||
|
||||
|
||||
def UI_Changes(oldprofile, newprofile, comments=False):
|
||||
@@ -290,8 +281,8 @@ def UI_Changes(oldprofile, newprofile, comments=False):
|
||||
else:
|
||||
difftemp = generate_diff_with_comments(oldprofile, newprofile)
|
||||
header = 'View Changes with comments'
|
||||
UI_ShowFile(header, difftemp.name)
|
||||
difftemp.close()
|
||||
with difftemp:
|
||||
UI_ShowFile(header, difftemp.name)
|
||||
|
||||
def UI_ShowFile(header, filename):
|
||||
if UI_mode == 'json':
|
||||
@@ -362,7 +353,7 @@ CMDS = {'CMD_ALLOW': _('(A)llow'),
|
||||
}
|
||||
|
||||
|
||||
class PromptQuestion(object):
|
||||
class PromptQuestion:
|
||||
title = None
|
||||
headers = None
|
||||
explanation = None
|
||||
|
@@ -26,11 +26,11 @@ import os
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
class Install(_install, object):
|
||||
class Install(_install):
|
||||
'''Override setuptools to install the files where we want them.'''
|
||||
def run(self):
|
||||
# Now byte-compile everything
|
||||
super(Install, self).run()
|
||||
super().run()
|
||||
|
||||
prefix = self.prefix
|
||||
if self.root != None:
|
||||
|
@@ -21,7 +21,7 @@ COMMONDIR=../../common/
|
||||
include $(COMMONDIR)/Make.rules
|
||||
|
||||
# files that don't have 100% test coverage
|
||||
INCOMPLETE_COVERAGE=libraries/libapparmor|utils/apparmor/aa.py|utils/apparmor/common.py|utils/apparmor/config.py|utils/apparmor/easyprof.py|utils/apparmor/fail.py|utils/apparmor/logparser.py|utils/apparmor/profile_storage.py|utils/apparmor/rules.py|utils/apparmor/ui.py|minitools_test.py
|
||||
INCOMPLETE_COVERAGE=libraries/libapparmor/swig/python/.*/LibAppArmor/LibAppArmor.py|utils/apparmor/aa.py|utils/apparmor/common.py|utils/apparmor/config.py|utils/apparmor/easyprof.py|utils/apparmor/fail.py|utils/apparmor/logparser.py|utils/apparmor/profile_storage.py|utils/apparmor/rules.py|utils/apparmor/ui.py|minitools_test.py
|
||||
|
||||
|
||||
ifdef USE_SYSTEM
|
||||
@@ -31,8 +31,7 @@ ifdef USE_SYSTEM
|
||||
BASEDIR=
|
||||
PARSER=
|
||||
else
|
||||
# PYTHON_DIST_BUILD_PATH based on libapparmor/swig/python/test/Makefile.am
|
||||
PYTHON_DIST_BUILD_PATH = ../../libraries/libapparmor/swig/python/build/$$($(PYTHON) -c "import sysconfig; print(\"lib.%s-%s\" %(sysconfig.get_platform(), sysconfig.get_python_version()))")
|
||||
PYTHON_DIST_BUILD_PATH = ../../libraries/libapparmor/swig/python/build/$$($(PYTHON) ../../libraries/libapparmor/swig/python/test/buildpath.py)
|
||||
LIBAPPARMOR_PATH=../../libraries/libapparmor/src/.libs/
|
||||
LD_LIBRARY_PATH=$(LIBAPPARMOR_PATH):$(PYTHON_DIST_BUILD_PATH)
|
||||
PYTHONPATH=..:$(PYTHON_DIST_BUILD_PATH)
|
||||
|
@@ -16,7 +16,7 @@ For more information, refer to the [unittest documentation](https://docs.python.
|
||||
Make sure to set the environment variables pointing to the in-tree apparmor modules, and the in-tree libapparmor and its python wrapper:
|
||||
|
||||
```bash
|
||||
$ export PYTHONPATH=..:../../libraries/libapparmor/swig/python/build/$(/usr/bin/python3 -c "import sysconfig; print(\"lib.%s-%s\" %(sysconfig.get_platform(), sysconfig.get_python_version()))")
|
||||
$ export PYTHONPATH=..:../../libraries/libapparmor/swig/python/build/$(/usr/bin/python3 ../../libraries/libapparmor/swig/python/test/buildpath.py)
|
||||
$ export __AA_CONFDIR=.
|
||||
```
|
||||
|
||||
@@ -24,4 +24,4 @@ To execute the test individually, run:
|
||||
|
||||
```bash
|
||||
$ python3 ./test-tile.py ClassFoo.test_bar
|
||||
```
|
||||
```
|
||||
|
@@ -12,10 +12,10 @@
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
# The locationg of the aa-decode utility can be overridden by setting
|
||||
# The location of the aa-decode utility can be overridden by setting
|
||||
# the APPARMOR_DECODE environment variable; this is useful for running
|
||||
# these tests in an installed environment
|
||||
aadecode_bin = "../aa-decode"
|
||||
@@ -57,27 +57,8 @@ def cmd(command, stdin=None):
|
||||
return sp.returncode, out.decode('utf-8')
|
||||
|
||||
|
||||
def mkstemp_fill(contents, suffix='', prefix='tst-aadecode-', dir=None):
|
||||
'''As tempfile.mkstemp does, return a (file, name) pair, but with prefilled contents.'''
|
||||
|
||||
handle, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
|
||||
os.close(handle)
|
||||
handle = open(name, "w+")
|
||||
handle.write(contents)
|
||||
handle.flush()
|
||||
handle.seek(0)
|
||||
|
||||
return handle, name
|
||||
|
||||
class AADecodeTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.tmpfile = None
|
||||
|
||||
def tearDown(self):
|
||||
if self.tmpfile and os.path.exists(self.tmpfile):
|
||||
os.remove(self.tmpfile)
|
||||
|
||||
def test_help(self):
|
||||
'''Test --help argument'''
|
||||
|
||||
@@ -92,15 +73,18 @@ class AADecodeTest(unittest.TestCase):
|
||||
|
||||
expected_return_code = 0
|
||||
|
||||
(f, self.tmpfile) = mkstemp_fill(content)
|
||||
with NamedTemporaryFile("w+", prefix='tst-aadecode-') as temp_file:
|
||||
self.tmpfile = temp_file.name
|
||||
temp_file.write(content)
|
||||
temp_file.flush()
|
||||
temp_file.seek(0)
|
||||
rc, report = cmd((aadecode_bin,), stdin=temp_file)
|
||||
|
||||
rc, report = cmd((aadecode_bin,), stdin=f)
|
||||
result = 'Got exit code %d, expected %d\n' % (rc, expected_return_code)
|
||||
self.assertEqual(expected_return_code, rc, result + report)
|
||||
for expected_string in expected:
|
||||
result = 'could not find expected %s in output:\n' % (expected_string)
|
||||
self.assertIn(expected_string, report, result + report)
|
||||
f.close()
|
||||
|
||||
def test_simple_decode(self):
|
||||
'''Test simple decode on command line'''
|
||||
|
@@ -58,7 +58,7 @@ class InterceptingOptionParser(optparse.OptionParser):
|
||||
raise InterceptedError(error_message=msg)
|
||||
|
||||
|
||||
class Manifest(object):
|
||||
class Manifest:
|
||||
def __init__(self, profile_name):
|
||||
self.security = dict()
|
||||
self.security['profiles'] = dict()
|
||||
|
@@ -13,9 +13,9 @@
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from common_test import AATest, setup_all_loops, setup_aa
|
||||
import apparmor.aa as aa
|
||||
@@ -112,10 +112,9 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.875891] audit: type=1400 audit({epoc
|
||||
Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoch}:122): apparmor="ALLOWED" operation="file_mmap" profile="libreoffice-soffice//null-/usr/bin/file" name="/usr/bin/file" pid=4111 comm="file" requested_mask="rm" denied_mask="rm" fsuid=1001 ouid=0
|
||||
'''.format(epoch=round(time.time(), 3))
|
||||
|
||||
handle, self.test_logfile = tempfile.mkstemp(prefix='test-aa-notify-')
|
||||
os.close(handle)
|
||||
with open(self.test_logfile, "w+") as handle:
|
||||
handle.write(
|
||||
with NamedTemporaryFile("w+", prefix='test-aa-notify-', delete=False) as temp_file:
|
||||
self.test_logfile = temp_file.name
|
||||
temp_file.write(
|
||||
test_logfile_contents_999_days_old +
|
||||
test_logfile_contents_30_days_old +
|
||||
test_logfile_contents_unrelevant_entries +
|
||||
|
@@ -15,7 +15,6 @@ from common_test import read_file, write_file
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
import apparmor.aa # needed to set global vars in some tests
|
||||
from apparmor.aa import (check_for_apparmor, get_output, get_reqs, get_interpreter_and_abstraction, create_new_profile,
|
||||
@@ -99,14 +98,7 @@ class AATest_get_reqs(AATest):
|
||||
)
|
||||
|
||||
def _run_test(self, params, expected):
|
||||
# for some reason, setting the ldd config option does not get
|
||||
# honored in python2.7
|
||||
# XXX KILL when python 2.7 is dropped XXX
|
||||
if sys.version_info[0] < 3:
|
||||
print("Skipping on python < 3.x")
|
||||
return
|
||||
apparmor.aa.cfg['settings']['ldd'] = './fake_ldd'
|
||||
|
||||
self.assertEqual(get_reqs(params), expected)
|
||||
|
||||
class AaTest_create_new_profile(AATest):
|
||||
@@ -118,12 +110,6 @@ class AaTest_create_new_profile(AATest):
|
||||
)
|
||||
def _run_test(self, params, expected):
|
||||
apparmor.aa.cfg['settings']['ldd'] = './fake_ldd'
|
||||
# for some reason, setting the ldd config option does not get
|
||||
# honored in python2.7
|
||||
# XXX KILL when python 2.7 is dropped XXX
|
||||
if sys.version_info[0] < 3:
|
||||
print("Skipping on python < 3.x")
|
||||
return
|
||||
|
||||
self.createTmpdir()
|
||||
|
||||
|
@@ -13,19 +13,7 @@ import unittest
|
||||
from common_test import AATest, setup_all_loops
|
||||
from apparmor.common import AppArmorBug
|
||||
|
||||
from apparmor.common import type_is_str, split_name, combine_profname
|
||||
|
||||
class TestIs_str_type(AATest):
|
||||
tests = (
|
||||
('foo', True),
|
||||
(u'foo', True),
|
||||
(42, False),
|
||||
(True, False),
|
||||
([], False),
|
||||
)
|
||||
|
||||
def _run_test(self, params, expected):
|
||||
self.assertEqual(type_is_str(params), expected)
|
||||
from apparmor.common import split_name, combine_profname
|
||||
|
||||
class AaTest_split_name(AATest):
|
||||
tests = (
|
||||
|
@@ -14,16 +14,13 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import unittest
|
||||
from common_test import AATest, setup_all_loops, setup_aa
|
||||
|
||||
import apparmor.aa as apparmor
|
||||
from common_test import read_file
|
||||
|
||||
python_interpreter = 'python'
|
||||
if sys.version_info >= (3, 0):
|
||||
python_interpreter = 'python3'
|
||||
python_interpreter = 'python3'
|
||||
|
||||
class MinitoolsTest(AATest):
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/python
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# Copyright (C) 2012 Canonical Ltd.
|
||||
#
|
||||
@@ -9,7 +9,6 @@
|
||||
# Written by Steve Beattie <steve@nxnw.org>, based on work by
|
||||
# Christian Boltz <apparmor@cboltz.de>
|
||||
|
||||
from __future__ import with_statement
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
Reference in New Issue
Block a user