2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-31 06:16:03 +00:00

Codebase update 2

This commit is contained in:
Kshitij Gupta
2013-07-09 03:46:26 +05:30
parent 58f48db381
commit ccee5cd5e0
5 changed files with 325 additions and 53 deletions

View File

@@ -6,7 +6,6 @@ Created on Jun 21, 2013
import sys
import unittest
sys.path.append('../')
sys.path.append('../apparmor')
import apparmor.severity as severity
@@ -17,7 +16,6 @@ class Test(unittest.TestCase):
s = severity.Severity('severity.db')
rank = s.rank('/dev/doublehit', 'i')
self.assertEqual(rank, 10, 'Wrong')
broken = severity.Severity('severity_broken.db')
try:
broken = severity.Severity('severity_broken.db')
except AppArmorException:

View File

@@ -1,33 +1,35 @@
#711
#1082
#382-430
#480-525
#global variable names corruption
from __future__ import with_statement
import inspect
import logging
import os
import re
import subprocess
import sys
import traceback
import atexit
import apparmor.config
import apparmor.severity
import LibAppArmor
from apparmor.common import (AppArmorException, error, debug, msg,
open_file_read, readkey, valid_path,
hasher, open_file_write)
DEBUGGING = False
debug_logger = None
# Setup logging incase of debugging is enabled
if os.getenv('LOGPROF_DEBUG', False):
import logging, atexit
DEBUGGING = True
logprof_debug = os.environ['LOGPROF_DEBUG']
logging.basicConfig(filename=logprof_debug, level=logging.DEBUG)
debug_logger = logging.getLogger('logprof')
import apparmor.config
import apparmor.severity
import LibAppArmor
from apparmor.common import AppArmorException, error, debug, msg, open_file_read, readkey, valid_path
CONFDIR = '/etc/apparmor'
running_under_genprof = False
@@ -72,7 +74,7 @@ pid = None
seen = dir()
profile_changes = dict()
prelog = dict()
log = dict()
log_dict = dict()
changed = dict()
created = []
helpers = dict() # Preserve this between passes # was our
@@ -264,7 +266,7 @@ def get_full_path(original_path):
"""Return the full path after resolving any symlinks"""
path = original_path
link_count = 0
if not '/' in path:
if not path.startswith('/'):
path = os.getcwd() + '/' + path
while os.path.islink(path):
link_count += 1
@@ -286,8 +288,8 @@ def find_executable(bin_path):
if os.path.exists(bin_path):
full_bin = get_full_path(bin_path)
else:
if '/' not in bin:
env_bin = which(bin)
if '/' not in bin_path:
env_bin = which(bin_path)
if env_bin:
full_bin = get_full_path(env_bin)
if full_bin and os.path.exists(full_bin):
@@ -296,45 +298,44 @@ def find_executable(bin_path):
def get_profile_filename(profile):
"""Returns the full profile name"""
filename = profile
if filename.startswith('/'):
if profile.startswith('/'):
# Remove leading /
filename = filename[1:]
profile = profile[1:]
else:
filename = "profile_" + filename
filename.replace('/', '.')
full_profilename = profile_dir + '/' + filename
profile = "profile_" + profile
profile.replace('/', '.')
full_profilename = profile_dir + '/' + profile
return full_profilename
def name_to_prof_filename(filename):
def name_to_prof_filename(prof_filename):
"""Returns the profile"""
if bin.startswith(profile_dir):
profile = filename.split(profile_dir, 1)[1]
return (filename, profile)
if prof_filename.startswith(profile_dir):
profile = prof_filename.split(profile_dir, 1)[1]
return (prof_filename, profile)
else:
bin_path = find_executable(filename)
bin_path = find_executable(prof_filename)
if bin_path:
filename = get_profile_filename(bin_path)
if os.path.isfile(filename):
return (filename, bin_path)
prof_filename = get_profile_filename(bin_path)
if os.path.isfile(prof_filename):
return (prof_filename, bin_path)
else:
return None, None
def complain(path):
"""Sets the profile to complain mode if it exists"""
filename, name = name_to_prof_filename(path)
if not filename :
prof_filename, name = name_to_prof_filename(path)
if not prof_filename :
fatal_error("Can't find %s" % path)
UI_Info('Setting %s to complain mode.' % name)
set_profile_flags(filename, 'complain')
set_profile_flags(prof_filename, 'complain')
def enforce(path):
"""Sets the profile to complain mode if it exists"""
filename, name = name_to_prof_filename(path)
if not filename :
prof_filename, name = name_to_prof_filename(path)
if not prof_filename :
fatal_error("Can't find %s" % path)
UI_Info('Setting %s to enforce moode' % name)
set_profile_flags(filename, '')
set_profile_flags(prof_filename, '')
def head(file):
"""Returns the first/head line of the file"""
@@ -375,7 +376,7 @@ def get_reqs(file):
pattern1 = re.compile('^\s*\S+ => (\/\S+)')
pattern2 = re.compile('^\s*(\/\S+)')
reqs = []
ret, ldd_out = get_output(ldd, file)
ret, ldd_out = get_output([ldd, file])
if ret == 0:
for line in ldd_out:
if 'not a dynamic executable' in line:
@@ -393,6 +394,263 @@ def get_reqs(file):
reqs.append(match.groups()[0])
return reqs
def handle_binfmt(profile, fqdbin):
reqs = dict()
def handle_binfmt(profile, path):
"""Modifies the profile to add the requirements"""
reqs_processed = dict()
reqs = get_reqs(path)
while reqs:
library = reqs.pop()
if not reqs_processed.get(library, False):
reqs.append(get_reqs(library))
reqs_processed[library] = True
combined_mode = match_prof_incs_to_path(profile, 'allow', library)
if combined_mode:
continue
library = glob_common(library)
if not library:
continue
try:
profile['allow']['path'][library]['mode'] |= str_to_mode('mr')
except TypeError:
profile['allow']['path'][library]['mode'] = str_to_mode('mr')
try:
profile['allow']['path'][library]['audit'] |= 0
except TypeError:
profile['allow']['path'][library]['audit'] = 0
def get_inactive_profile(local_profile):
if extras.get(local_profile, False):
return {local_profile: extras[local_profile]}
return dict()
def create_new_profile(localfile):
local_profile = hasher()
local_profile[localfile]['flags'] = 'complain'
local_profile[localfile]['include']['abstractions/base'] = 1
#local_profile = {
# localfile: {
# 'flags': 'complain',
# 'include': {'abstraction/base': 1},
# 'allow': {'path': {}}
# }
# }
if os.path.isfile(localfile):
hashbang = head(localfile)
if hashbang.startswith('#!'):
interpreter = get_full_path(hashbang.lstrip('#!').strip())
try:
local_profile[localfile]['allow']['path'][localfile]['mode'] |= str_to_mode('r')
except TypeError:
local_profile[localfile]['allow']['path'][localfile]['mode'] = str_to_mode('r')
try:
local_profile[localfile]['allow']['path'][localfile]['audit'] |= 0
except TypeError:
local_profile[localfile]['allow']['path'][localfile]['audit'] = 0
try:
local_profile[localfile]['allow']['path'][interpreter]['mode'] |= str_to_mode('ix')
except TypeError:
local_profile[localfile]['allow']['path'][interpreter]['mode'] = str_to_mode('ix')
try:
local_profile[localfile]['allow']['path'][interpreter]['audit'] |= 0
except TypeError:
local_profile[localfile]['allow']['path'][interpreter]['audit'] = 0
if 'perl' in interpreter:
local_profile[localfile]['include']['abstractions/perl'] = 1
elif 'python' in interpreter:
local_profile[localfile]['include']['abstractions/python'] = 1
elif 'ruby' in interpreter:
local_profile[localfile]['include']['abstractions/ruby'] = 1
elif '/bin/bash' in interpreter or '/bin/dash' in interpreter or '/bin/sh' in interpreter:
local_profile[localfile]['include']['abstractions/ruby'] = 1
handle_binfmt(local_profile[localfile], interpreter)
else:
try:
local_profile[localfile]['allow']['path'][localfile]['mode'] |= str_to_mode('mr')
except TypeError:
local_profile[localfile]['allow']['path'][localfile]['mode'] = str_to_mode('mr')
try:
local_profile[localfile]['allow']['path'][localfile]['audit'] |= 0
except TypeError:
local_profile[localfile]['allow']['path'][localfile] = 0
handle_binfmt(local_profile[localfile], localfile)
# Add required hats to the profile if they match the localfile
for hatglob in cfg['required_hats'].keys():
if re.search(hatglob, localfile):
for hat in sorted(cfg['required_hats'][hatglob].split()):
local_profile[hat]['flags'] = 'complain'
created.append(localfile)
if DEBUGGING:
debug_logger.debug("Profile for %s:\n\t%s" % (localfile, local_profile.__str__()))
return {localfile: local_profile}
def delete_profile(local_prof):
"""Deletes the specified file from the disk and remove it from our list"""
profile_file = get_profile_filename(local_prof)
if os.path.isfile(profile_file):
os.remove(profile_file)
if aa.get(local_prof, False):
aa.pop(local_prof)
def get_profile(prof_name):
profile_data = None
distro = cfg['repository']['distro']
repo_url = cfg['repository']['url']
local_profiles = []
profile_hash = hasher()
if repo_is_enabled():
UI_BusyStart('Coonecting to repository.....')
status_ok, ret = fetch_profiles_by_name(repo_url, distro, prof_name)
UI_BustStop()
if status_ok:
profile_hash = ret
else:
UI_Important('WARNING: Error fetching profiles from the repository')
inactive_profile = get_inactive_profile(prof_name)
if inactive_profile:
uname = 'Inactive local profile for %s' % prof_name
inactive_profile[prof_name][prof_name]['flags'] = 'complain'
inactive_profile[prof_name][prof_name].pop('filename')
profile_hash[uname]['username'] = uname
profile_hash[uname]['profile_type'] = 'INACTIVE_LOCAL'
profile_hash[uname]['profile'] = serialize_profile(inactive_profile[prof_name], prof_name)
profile_hash[uname]['profile_data'] = inactive_profile
# If no profiles in repo and no inactive profiles
if not profile_hash.keys():
return None
options = []
tmp_list = []
preferred_present = False
preferred_user = cfg['repository'].get('preferred_user', 'NOVELL')
for p in profile_hash.keys():
if profile_hash[p]['username'] == preferred_user:
preferred_present = True
else:
tmp_list.append(profile_hash[p]['username'])
if preferred_present:
options.append(preferred_user)
options += tmp_list
q = dict()
q['headers'] = ['Profile', prof_name]
q['functions'] = ['CMD_VIEW_PROFILE', 'CMD_USE_PROFILE', 'CMD_CREATE_PROFILE',
'CMD_ABORT', 'CMD_FINISHED']
q['default'] = "CMD_VIEW_PROFILE"
q['options'] = options
q['selected'] = 0
ans = ''
while 'CMD_USE_PROFILE' not in ans and 'CMD_CREATE_PROFILE' not in ans:
ans, arg = UI_PromptUser(q)
p = profile_hash[options[arg]]
q['selected'] = options.index(options[arg])
if ans == 'CMD_VIEW_PROFILE':
if UI_mode == 'yast':
SendDataToYast({
'type': 'dialogue-view-profile',
'user': options[arg],
'profile': p['profile'],
'profile_type': p['profile_type']
})
ypath, yarg = GetDataFromYast()
else:
pager = get_pager()
proc = subprocess.Popen(pager, stdin=subprocess.PIPE)
proc.communicate('Profile submitted by %s:\n\n%s\n\n' %
(options[arg], p['profile']))
proc.kill()
elif ans == 'CMD_USE_PROFILE':
if p['profile_type'] == 'INACTIVE_LOCAL':
profile_data = p['profile_data']
created.append(prof_name)
else:
profile_data = parse_repo_profile(prof_name, repo_url, p)
return profile_data
def activate_repo_profiles(url, profiles, complain):
read_profiles()
try:
for p in profiles:
pname = p[0]
profile_data = parse_repo_profile(pname, url, p[1])
attach_profile_data(aa, profile_data)
write_profile(pname)
if complain:
fname = get_profile_filename(pname)
set_profile_flags(fname, 'complain')
UI_Info('Setting %s to complain mode.' % pname)
except Exception as e:
sys.stderr.write("Error activating profiles: %s" % e)
def autodep(bin_name, pname=''):
bin_full = None
if not bin_name and pname.startswith('/'):
bin_name = pname
if not repo_cfg and not cfg['repository'].get('url', False):
repo_cfg = read_config('repository.conf')
if not repo_cfg.get('repository', False) or repo_cfg['repository']['enabled'] == 'later':
UI_ask_to_enable_repo()
if bin_name:
bin_full = find_executable(bin_name)
#if not bin_full:
# bin_full = bin_name
#if not bin_full.startswith('/'):
#return None
# Return if exectuable path not found
if not bin_full:
return None
pname = bin_full
read_inactive_profile()
profile_data = get_profile(pname)
# Create a new profile if no existing profile
if not profile_data:
profile_data = create_new_profile(pname)
file = get_profile_filename(pname)
attach_profile_data(aa, profile_data)
attach_profile_data(aa_original, profile_data)
if os.path.isfile(profile_dir + '/tunables/global'):
if not filelist.get(file, False):
filelist.file = hasher()
filelist[file][include]['tunables/global'] = True
write_profile_ui_feedback(pname)
def set_profile_flags(prof_filename, newflags):
"""Reads the old profile file and updates the flags accordingly"""
regex_bin_flag = re.compile('^(\s*)(("??\/.+?"??)|(profile\s+("??.+?"??)))\s+(flags=\(.+\)\s+)*\{\s*$/')
regex_hat_flag = re.compile('^(\s*\^\S+)\s+(flags=\(.+\)\s+)*\{\s*$')
if os.path.isfile(prof_filename):
with open_file_read(prof_filename) as f_in:
with open_file_write(prof_filename + '.new') as f_out:
for line in f_in:
match = regex_bin_flag.search(line)
if match:
space, binary, flags = match.groups()
if newflags:
line = '%s%s flags=(%s) {\n' % (space, binary, newflags)
else:
line = '%s%s {\n' % (space, binary)
else:
match = regex_hat_flag.search(line)
if match:
hat, flags = match.groups()
if newflags:
line = '%s flags=(%s) {\n' % (hat, newflags)
else:
line = '%s {\n' % hat
f_out.write(line)
os.rename(prof_filename+'.new', prof_filename)
def profile_exists(program):
"""Returns True if profile exists, False otherwise"""
# Check cache of profiles
if existing_profiles.get(program, False):
return True
# Check the disk for profile
prof_path = get_profile_filename(program)
if os.path.isfile(prof_path):
# Add to cache of profile
existing_profiles[program] = True
return True
return False

View File

@@ -1,5 +1,6 @@
from __future__ import print_function
import codecs
import collections
import glob
import os
import subprocess
@@ -18,6 +19,7 @@ class AppArmorException(Exception):
self.value = value
def __str__(self):
return self.value
return repr(self.value)
#
@@ -121,12 +123,20 @@ def get_directory_contents(path):
def open_file_read(path):
'''Open specified file read-only'''
try:
orig = codecs.open(path, 'r', "UTF-8")
orig = codecs.open(path, 'r', 'UTF-8')
except Exception:
raise
return orig
def open_file_write(path):
"""Open specified file in write/overwrite mode"""
try:
orig = codecs.open(path, 'w', 'UTF-8')
except Exception:
raise
return orig
def readkey():
"""Returns the pressed key"""
fd = sys.stdin.fileno()
@@ -137,4 +147,9 @@ def readkey():
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
return ch
def hasher():
"""A neat alternative to perl's hash reference"""
# Creates a dictionary for any depth and returns empty dictionary otherwise
return collections.defaultdict(hasher)

View File

@@ -20,12 +20,12 @@ SHELL_FILES = ['easyprof.conf', 'notify.conf', 'parser.conf', 'subdomain.conf']
class Config:
def __init__(self, conf_type):
# The type of config file that'll be read and/or written
if conf_type != 'shell' or conf_type != 'ini':
raise AppArmorException("Unknown configuration file type")
else:
if conf_type == 'shell' or conf_type == 'ini':
self.conf_type = conf_type
self.input_file = None
else:
raise AppArmorException("Unknown configuration file type")
def new_config(self):
if self.conf_type == 'shell':
config = {'': dict()}
@@ -82,11 +82,10 @@ class Config:
def find_first_file(self, file_list):
"""Returns name of first matching file None otherwise"""
filename = None
if filename:
for file in file_list.split():
if os.path.isfile(file):
filename = file
break
for file in file_list.split():
if os.path.isfile(file):
filename = file
break
return filename
def find_first_dir(self, dir_list):

View File

@@ -28,7 +28,7 @@ class Severity:
line = line.split('+=')
try:
self.severity['VARIABLES'][line[0]] += [i.strip('"') for i in line[1].split()]
except KeyError:
except KeyError as e:
raise AppArmorException("Variable %s was not previously declared, but is being assigned additional values" % line[0])
else:
line = line.split('=')
@@ -75,8 +75,10 @@ class Severity:
try:
resource, severity = line.split()
severity = int(severity)
except ValueError:
raise AppArmorException("No severity value present in file: %s\n\t[Line %s]: %s" % (dbname, lineno, line))
except ValueError as e:
error_message = 'No severity value present in file: %s\n\t[Line %s]: %s' % (dbname, lineno, line)
error(error_message)
raise AppArmorException(error_message) from None
else:
if severity not in range(0,11):
raise AppArmorException("Inappropriate severity value present in file: %s\n\t[Line %s]: %s" % (dbname, lineno, line))