2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-29 21:38:15 +00:00

Some fixes from review16 and updated codebase

This commit is contained in:
Kshitij Gupta 2013-07-17 20:38:13 +05:30
parent ccee5cd5e0
commit a33c95f8b1
5 changed files with 596 additions and 10 deletions

View File

@ -6,7 +6,7 @@ Created on Jun 21, 2013
import sys
import unittest
sys.path.append('../apparmor')
sys.path.append('../')
import apparmor.severity as severity
from apparmor.common import AppArmorException

View File

@ -1,4 +1,4 @@
#1082
#1321
#382-430
#480-525
#global variable names corruption
@ -7,10 +7,12 @@ import inspect
import logging
import os
import re
import shutil
import subprocess
import sys
import traceback
import atexit
import tempfile
import apparmor.config
import apparmor.severity
@ -20,6 +22,8 @@ from apparmor.common import (AppArmorException, error, debug, msg,
open_file_read, readkey, valid_path,
hasher, open_file_write)
from apparmor.ui import *
DEBUGGING = False
debug_logger = None
@ -491,6 +495,7 @@ def delete_profile(local_prof):
os.remove(profile_file)
if aa.get(local_prof, False):
aa.pop(local_prof)
prof_unload(local_prof)
def get_profile(prof_name):
profile_data = None
@ -619,28 +624,36 @@ def autodep(bin_name, pname=''):
def set_profile_flags(prof_filename, newflags):
"""Reads the old profile file and updates the flags accordingly"""
regex_bin_flag = re.compile('^(\s*)(("??\/.+?"??)|(profile\s+("??.+?"??)))\s+(flags=\(.+\)\s+)*\{\s*$/')
regex_hat_flag = re.compile('^(\s*\^\S+)\s+(flags=\(.+\)\s+)*\{\s*$')
regex_hat_flag = re.compile('^([a-z]*)\s+([A-Z]*)((\s+#\S*)*)\s*$')
a=re.compile('^([a-z]*)\s+([A-Z]*)((\s+#\S*)*)\s*$')
regex_hat_flag = re.compile('^(\s*\^\S+)\s+(flags=\(.+\)\s+)*\{\s*(#*\S*)$')
if os.path.isfile(prof_filename):
with open_file_read(prof_filename) as f_in:
with open_file_write(prof_filename + '.new') as f_out:
tempfile = tempfile.NamedTemporaryFile('w', prefix=prof_filename , delete=False, dir='/etc/apparmor.d/')
shutil.copymode('/etc/apparmor.d/' + prof_filename, tempfile.name)
with open_file_write(tempfile.name) as f_out:
for line in f_in:
if '#' in line:
comment = '#' + line.split('#', 1)[1].rstrip()
else:
comment = ''
match = regex_bin_flag.search(line)
if match:
space, binary, flags = match.groups()
if newflags:
line = '%s%s flags=(%s) {\n' % (space, binary, newflags)
line = '%s%s flags=(%s) {%s\n' % (space, binary, newflags, comment)
else:
line = '%s%s {\n' % (space, binary)
line = '%s%s {%s\n' % (space, binary, comment)
else:
match = regex_hat_flag.search(line)
if match:
hat, flags = match.groups()
if newflags:
line = '%s flags=(%s) {\n' % (hat, newflags)
line = '%s flags=(%s) {%s\n' % (hat, newflags, comment)
else:
line = '%s {\n' % hat
line = '%s {%s\n' % (hat, comment)
f_out.write(line)
os.rename(prof_filename+'.new', prof_filename)
os.rename(tempfile.name, prof_filename)
def profile_exists(program):
"""Returns True if profile exists, False otherwise"""
@ -654,3 +667,240 @@ def profile_exists(program):
existing_profiles[program] = True
return True
return False
def sync_profile():
user, passw = get_repo_user_pass()
if not user or not passw:
return None
repo_profiles = []
changed_profiles = []
new_profiles = []
serialize_opts = hasher()
status_ok, ret = fetch_profiles_by_user(cfg['repository']['url'],
cfg['repository']['distro'], user)
if not status_ok:
if not ret:
ret = 'UNKNOWN ERROR'
UI_Important('WARNING: Error synchronizing profiles with the repository:\n%s\n' % ret)
else:
users_repo_profiles = ret
serialuze_opts['NO_FLAGS'] = True
for prof in sorted(aa.keys()):
if is_repo_profile([aa[prof][prof]]):
repo_profiles.append(prof)
if prof in created:
p_local = seralize_profile(aa[prof], prof, serialize_opts)
if not users_repo_profiles.get(prof, False):
new_profiles.append(prof)
new_profiles.append(p_local)
new_profiles.append('')
else:
p_repo = users_repo_profiles[prof]['profile']
if p_local != p_repo:
changed_profiles.append(prof)
changed_profiles.append(p_local)
changed_profiles.append(p_repo)
if repo_profiles:
for prof in repo_profiles:
p_local = serialize_profile(aa[prof], prof, serialize_opts)
if not users_repo_profiles.get(prof, False):
new_profiles.append(prof)
new_profiles.append(p_local)
new_profiles.append('')
else:
p_repo = ''
if aa[prof][prof]['repo']['user'] == user:
p_repo = users_repo_profiles[prof]['profile']
else:
status_ok, ret = fetch_profile_by_id(cfg['repository']['url'],
aa[prof][prof]['repo']['id'])
if status_ok:
p_repo = ret['profile']
else:
if not ret:
ret = 'UNKNOWN ERROR'
UI_Important('WARNING: Error synchronizing profiles witht he repository\n%s\n' % ret)
continue
if p_repo != p_local:
changed_profiles.append(prof)
changed_profiles.append(p_local)
changed_profiles.append(p_repo)
if changed_profiles:
submit_changed_profiles(changed_profiles)
if new_profiles:
submit_created_profiles(new_profiles)
def submit_created_profiles(new_profiles):
#url = cfg['repository']['url']
if new_profiles:
if UI_mode == 'yast':
title = 'New Profiles'
message = 'Please select the newly created profiles that you would like to store in the repository'
yast_select_and_upload_profiles(title, message, new_profiles)
else:
title = 'Submit newly created profiles to the repository'
message = 'Would you like to upload newly created profiles?'
console_select_and_upload_profiles(title, message, new_profiles)
def submit_changed_profiles(changed_profiles):
#url = cfg['repository']['url']
if changed_profiles:
if UI_mode == 'yast':
title = 'Changed Profiles'
message = 'Please select which of the changed profiles would you like to upload to the repository'
yast_select_and_upload_profiles(title, message, changed_profiles)
else:
title = 'Submit changed profiles to the repository'
message = 'The following profiles from the repository were changed.\nWould you like to upload your changes?'
console_select_and_upload_profiles(title, message, changed_profiles)
def yast_select_and_upload_profiles(title, message, profiles_up):
url = cfg['repository']['url']
profile_changes = hasher()
profs = profiles_up[:]
for p in profs:
profile_changes[p[0]] = get_profile_diff(p[2], p[1])
SendDataToYast({
'type': 'dialog-select-profiles',
'title': title,
'explanation': message,
'default_select': 'false',
'disable_ask_upload': 'true',
'profiles': profile_changes
})
ypath, yarg = GetDataFromYast()
selected_profiles = []
changelog = None
changelogs = None
single_changelog = False
if yarg['STATUS'] == 'cancel':
return
else:
selected_profiles = yarg['PROFILES']
changelogs = yarg['CHANGELOG']
if changelogs.get('SINGLE_CHANGELOG', False):
changelog = changelogs['SINGLE_CHANGELOG']
single_changelog = True
user, passw = get_repo_user_pass()
for p in selected_profiles:
profile_string = serialize_profile(aa[p], p)
if not single_changelog:
changelog = changelogs[p]
status_ok, ret = upload_profile(url, user, passw, cfg['repository']['distro'],
p, profile_string, changelog)
if status_ok:
newprofile = ret
newid = newprofile['id']
set_repo_info(aa[p][p], url, user, newid)
write_profile_ui_feedback(p)
else:
if not ret:
ret = 'UNKNOWN ERROR'
UI_Important('WARNING: An error occured while uploading the profile %s\n%s\n' % (p, ret))
UI_Info('Uploaded changes to repository.')
if yarg.get('NEVER_ASK_AGAIN'):
unselected_profiles = []
for p in profs:
if p[0] not in selected_profiles:
unselected_profiles.append(p[0])
set_profiles_local_only(unselected_profiles)
def console_select_and_upload_profiles(title, message, profiles_up):
url = cfg['repository']['url']
profs = profiles_up[:]
q = hasher()
q['title'] = title
q['headers'] = ['Repository', url]
q['explanation'] = message
q['functions'] = ['CMD_UPLOAD_CHANGES', 'CMD_VIEW_CHANGES', 'CMD_ASK_LATER',
'CMD_ASK_NEVER', 'CMD_ABORT']
q['default'] = 'CMD_VIEW_CHANGES'
q['options'] = [i[0] for i in profs]
q['selected'] = 0
ans = ''
while 'CMD_UPLOAD_CHANGES' not in ans and 'CMD_ASK_NEVER' not in ans and 'CMD_ASK_LATER' not in ans:
ans, arg = UI_PromptUser(q)
if ans == 'CMD_VIEW_CHANGES':
display_changes(profs[arg][2], profs[arg][1])
if ans == 'CMD_NEVER_ASK':
set_profiles_local_only([i[0] for i in profs])
elif ans == 'CMD_UPLOAD_CHANGES':
changelog = UI_GetString('Changelog Entry: ', '')
user, passw = get_repo_user_pass()
if user and passw:
for p_data in profs:
prof = p_data[0]
prof_string = p_data[1]
status_ok, ret = upload_profile(url, user, passw,
cfg['repository']['distro'],
prof, prof_string, changelog )
if status_ok:
newprof = ret
newid = newprof['id']
set_repo_info(aa[prof][prof], url, user, newid)
write_profile_ui_feedback(prof)
UI_Info('Uploaded %s to repository' % prof)
else:
if not ret:
ret = 'UNKNOWN ERROR'
UI_Important('WARNING: An error occured while uploading the profile %s\n%s\n' % (prof, ret))
else:
UI_Important('Repository Error\nRegistration or Sigin was unsuccessful. User login\n' +
'information is required to upload profiles to the repository.\n' +
'These changes could not be sent.\n')
def set_profile_local_only(profs):
for p in profs:
aa[profs][profs]['repo']['neversubmit'] = True
writeback_ui_feedback(profs)
def confirm_and_abort():
ans = UI_YesNo('Are you sure you want to abandon this set of profile changes and exit?', 'n')
if ans == 'y':
UI_Info('Abandoning all changes.')
shutdown_yast()
for prof in created:
delete_profile(prof)
sys.exit(0)
def confirm_and_finish():
sys.stdout.write('Finishing\n')
sys.exit(0)
def build_x_functions(default, options, exec_toggle):
ret_list = []
if exec_toggle:
if 'i' in options:
ret_list.append('CMD_ix')
if 'p' in options:
ret_list.append('CMD_pix')
ret_list.append('CMD_EXEC_IX_OFF')
elif 'c' in options:
ret_list.append('CMD_cix')
ret_list.append('CMD_EXEC_IX_OFF')
elif 'n' in options:
ret_list.append('CMD_nix')
ret_list.append('CMD_EXEC_IX_OFF')
elif 'u' in options:
ret_list.append('CMD_ux')
else:
if 'i' in options:
ret_list.append('CMD_ix')
elif 'c' in options:
ret_list.append('CMD_cx')
ret_list.append('CMD_EXEC_IX_ON')
elif 'p' in options:
ret_list.append('CMD_px')
ret_list.append('CMD_EXEC_IX_OFF')
elif 'n' in options:
ret_list.append('CMD_nx')
ret_list.append('CMD_EXEC_IX_OFF')
elif 'u' in options:
ret_list.append('CMD_ux')
ret_list += ['CMD_DENY', 'CMD_ABORT', 'CMD_FINISHED']
return ret_list
def handle_children(profile, hat, root):
entries = root[:]
for entry in entries:

View File

@ -19,7 +19,6 @@ class AppArmorException(Exception):
self.value = value
def __str__(self):
return self.value
return repr(self.value)
#

255
apparmor/ui.py Normal file
View File

@ -0,0 +1,255 @@
# 1728
import sys
import gettext
import locale
from yasti import yastLog, SendDataToYast, GetDataFromYast
from apparmor.common import readkey
DEBUGGING = False
debug_logger = None
UI_mode = 'text'
def init_localisation():
locale.setlocale(locale.LC_ALL, '')
#cur_locale = locale.getlocale()
filename = 'res/messages_%s.mo' % locale.getlocale()[0][0:2]
try:
trans = gettext.GNUTranslations(open( filename, 'rb'))
except IOError:
trans = gettext.NullTranslations()
trans.install()
def UI_Info(text):
if DEBUGGING:
debug_logger.info(text)
if UI_mode == 'text':
sys.stdout.write(text + '\n')
else:
yastLog(text)
def UI_Important(text):
if DEBUGGING:
debug_logger.debug(text)
if UI_mode == 'text':
sys.stdout.write('\n' + text + '\n')
else:
SendDataToYast({
'type': 'dialog-error',
'message': text
})
path, yarg = GetDataFromYast()
def UI_YesNo(text, default):
if DEBUGGING:
debug_logger.debug('UI_YesNo: %s: %s %s' %(UI_mode, text, default))
ans = default
if UI_mode == 'text':
yes = '(Y)es'
no = '(N)o'
usrmsg = 'PromptUser: Invalid hotkey for'
yeskey = 'y'
nokey = 'n'
sys.stdout.write('\n' + text + '\n')
if default == 'y':
sys.stdout.write('\n[%s] / %s\n' % (yes, no))
else:
sys.stdout.write('\n%s / [%s]\n' % (yes, no))
ans = readkey()
if ans:
ans = ans.lower()
else:
ans = default
else:
SendDataTooYast({
'type': 'dialog-yesno',
'question': text
})
ypath, yarg = GetDataFromYast()
ans = yarg['answer']
if not ans:
ans = default
return ans
def UI_YesNoCancel(text, default):
if DEBUGGING:
debug_logger.debug('UI_YesNoCancel: %s: %s %s' % (UI_mode, text, default))
if UI_mode == 'text':
yes = '(Y)es'
no = '(N)o'
cancel = '(C)ancel'
yeskey = 'y'
nokey = 'n'
cancelkey = 'c'
ans = 'XXXINVALIDXXX'
while ans != 'c' and ans != 'n' and ans != 'y':
sys.stdout.write('\n' + text + '\n')
if default == 'y':
sys.stdout.write('\n[%s] / %s / %s\n' % (yes, no, cancel))
elif default == 'n':
sys.stdout.write('\n%s / [%s] / %s\n' % (yes, no, cancel))
else:
sys.stdout.write('\n%s / %s / [%s]\n' % (yes, no, cancel))
ans = readkey()
if ans:
ans = ans.lower()
else:
ans = default
else:
SendDataToYast({
'type': 'dialog-yesnocancel',
'question': text
})
ypath, yarg = GetDataFromYast()
ans = yarg['answer']
if not ans:
ans = default
return ans
def UI_GetString(text, default):
if DEBUGGING:
debug_logger.debug('UI_GetString: %s: %s %s' % (UI_mode, text, default))
string = default
if UI_mode == 'text':
sys.stdout.write('\n' + text + '\n')
string = sys.stdin.readline()
else:
SendDataToYast({
'type': 'dialog-getstring',
'label': text,
'default': default
})
ypath, yarg = GetDatFromYast()
string = yarg['string']
return string
def UI_GetFile(file):
if DEBUGGING:
debug_logger.debug('UI_GetFile: %s' % UI_mode)
filename = None
if UI_mode == 'text':
sys.stdout.write(file['description'] + '\n')
filename = sys.stdin.read()
else:
file['type'] = 'dialog-getfile'
SendDataToYast(file)
ypath, yarg = GetDataFromYast()
if yarg['answer'] == 'okay':
filename = yarg['filename']
return filename
def UI_BusyStart(message):
if DEBUGGING:
debug_logger.debug('UI_BusyStart: %s' % UI_mode)
if UI_mode == 'text':
UI_Info(message)
else:
SendDataToYast({
'type': 'dialog-busy-start',
'message': message
})
ypath, yarg = GetDataFromYast()
def UI_BusyStop():
if DEBUGGING:
debug_logger.debug('UI_BusyStop: %s' % UI_mode)
if UI_mode != 'text':
SendDataToYast({'type': 'dialog-busy-stop'})
ypath, yarg = GetDataFromYast()
CMDS = {
'CMD_ALLOW': '(A)llow',
'CMD_OTHER': '(M)ore',
'CMD_AUDIT_NEW': 'Audi(t)',
'CMD_AUDIT_OFF': 'Audi(t) off',
'CMD_AUDIT_FULL': 'Audit (A)ll',
#'CMD_OTHER': '(O)pts',
'CMD_USER_ON': '(O)wner permissions on',
'CMD_USER_OFF': '(O)wner permissions off',
'CMD_DENY': '(D)eny',
'CMD_ABORT': 'Abo(r)t',
'CMD_FINISHED': '(F)inish',
'CMD_ix': '(I)nherit',
'CMD_px': '(P)rofile',
'CMD_px_safe': '(P)rofile Clean Exec',
'CMD_cx': '(C)hild',
'CMD_cx_safe': '(C)hild Clean Exec',
'CMD_nx': 'Named',
'CMD_nx_safe': 'Named Clean Exec',
'CMD_ux': '(U)nconfined',
'CMD_ux_safe': '(U)nconfined Clean Exec',
'CMD_pix': '(P)rofile Inherit',
'CMD_pix_safe': '(P)rofile Inherit Clean Exec',
'CMD_cix': '(C)hild Inherit',
'CMD_cix_safe': '(C)hild Inherit Clean Exec',
'CMD_nix': '(N)amed Inherit',
'CMD_nix_safe': '(N)amed Inherit Clean Exec',
'CMD_EXEC_IX_ON': '(X) ix On',
'CMD_EXEC_IX_OFF': '(X) ix Off',
'CMD_SAVE': '(S)ave Changes',
'CMD_CONTINUE': '(C)ontinue Profiling',
'CMD_NEW': '(N)ew',
'CMD_GLOB': '(G)lob',
'CMD_GLOBEXT': 'Glob with (E)xtension',
'CMD_ADDHAT': '(A)dd Requested Hat',
'CMD_USEDEFAULT': '(U)se Default Hat',
'CMD_SCAN': '(S)can system log for AppArmor events',
'CMD_HELP': '(H)elp',
'CMD_VIEW_PROFILE': '(V)iew Profile',
'CMD_USE_PROFILE': '(U)se Profile',
'CMD_CREATE_PROFILE': '(C)reate New Profile',
'CMD_UPDATE_PROFILE': '(U)pdate Profile',
'CMD_IGNORE_UPDATE': '(I)gnore Update',
'CMD_SAVE_CHANGES': '(S)ave Changes',
'CMD_UPLOAD_CHANGES': '(U)pload Changes',
'CMD_VIEW_CHANGES': '(V)iew Changes',
'CMD_VIEW': '(V)iew',
'CMD_ENABLE_REPO': '(E)nable Repository',
'CMD_DISABLE_REPO': '(D)isable Repository',
'CMD_ASK_NEVER': '(N)ever Ask Again',
'CMD_ASK_LATER': 'Ask Me (L)ater',
'CMD_YES': '(Y)es',
'CMD_NO': '(N)o',
'CMD_ALL_NET': 'Allow All (N)etwork',
'CMD_NET_FAMILY': 'Allow Network Fa(m)ily',
'CMD_OVERWRITE': '(O)verwrite Profile',
'CMD_KEEP': '(K)eep Profile',
'CMD_CONTINUE': '(C)ontinue'
}
def UI_PromptUser(q):
cmd = None
arg = None
if UI_mode == 'text':
cmd, arg = Text_PromptUser(q)
else:
q['type'] = 'wizard'
SendDataToYast(q)
ypath, yarg = GetDataFromYast()
if not cmd:
cmd = 'CMD_ABORT'
arg = yarg['selected']
if cmd == 'CMD_ABORT':
confirm_and_abort()
cmd == 'XXXINVALIDXXX'
elif cmd == 'CMD_FINISHED':
confirm_and_finish()
cmd == 'XXXINVALIDXXX'
return (cmd, arg)
def UI_ShortMessage(title, message):
SendDataToYast({
'type': 'short-dialog-message',
'headline': title,
'message': message
})
ypath, yarg = GetDataFromYast()
def UI_longMessage(title, message):
SendDataToYast({
'type': 'long-dialog-message',
'headline': title,
'message': message
})
ypath, yarg = GetDataFromYast()

82
apparmor/yasti.py Normal file
View File

@ -0,0 +1,82 @@
import re
import ycp
def yastLog(text):
ycp.y2milestone(text)
def SendDataToYast(data):
if DEBUGGING:
debug_logger.info('SendDataToYast: Waiting for YCP command')
for line in sys.stdin:
ycommand, ypath, yargument = ParseCommand(line)
if ycommand and ycommand == 'Read':
if DEBUGGING:
debug_logger.info('SendDataToYast: Sending--%s' % data)
Return(data)
return True
else:
if DEBUGGING:
debug_logger.info('SendDataToYast: Expected \'Read\' but got-- %s' % line)
fatal_error('SendDataToYast: didn\'t receive YCP command before connection died')
def GetDataFromYast():
if DEBUGGING:
debug_logger.inf('GetDataFromYast: Waiting for YCP command')
for line in sys.stdin:
if DEBUGGING:
debug_logger.info('GetDataFromYast: YCP: %s' % line)
ycommand, ypath, yarg = ParseCommand(line)
if DEBUGGING:
debug_logger.info('GetDataFromYast: Recieved--\n%s' % yarg)
if ycommand and ycommand == 'Write':
Return('true')
return ypath, yarg
else:
if DEBUGGING:
debug_logger.info('GetDataFromYast: Expected Write but got-- %s' % line)
fatal_error('GetDataFromYast: didn\'t receive YCP command before connection died')
def ParseCommand(commands):
term = ParseTerm(commands)
if term:
command = term[0]
term = term[1:]
else:
command = ''
path = ''
pathref = None
if term:
pathref = term[0]
term = term[1:]
if pathref:
if pathref.strip():
path = pathref.strip()
elif command != 'result':
ycp.y2error('The first arguement is not a path. (%s)' % pathref)
argument = None
if term:
argument = term[0]
if len(term) > 1:
ycp.y2warning('Superfluous command arguments ignored')
return (command, path, argument)
def ParseTerm(input):
regex_term = re.compile('^\s*`?(\w*)\s*')
term = regex_term.search(input)
ret = []
symbol = None
if term:
symbol = term.groups()[0]
else:
ycp.y2error('No term symbol')
ret.append(symbol)
input = regex_term.sub('', input)
if not input.startswith('('):
ycp.y2error('No term parantheses')
argref, err, rest = ParseYcpTermBody(input)
if err:
ycp.y2error('%s (%s)' % (err, rest))
else:
ret += argref
return ret