2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-31 14:25:52 +00:00

Python2 compatible code except for configparser, code from week2

This commit is contained in:
Kshitij Gupta
2013-07-04 04:12:04 +05:30
parent e4ad1bde21
commit b3767766ef
9 changed files with 586 additions and 344 deletions

View File

@@ -1,104 +0,0 @@
# ------------------------------------------------------------------
#
# Copyright (C) 2004-2006 Novell/SUSE
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License published by the Free Software Foundation.
#
# ------------------------------------------------------------------
[repository]
distro = ubuntu-intrepid
url = http://apparmor.test.opensuse.org/backend/api
preferred_user = ubuntu
[qualifiers]
# things will be painfully broken if bash has a profile
/bin/bash = icnu
/bin/ksh = icnu
/bin/dash = icnu
# these programs can't function if they're confined
/bin/mount = u
/etc/init.d/subdomain = u
/sbin/cardmgr = u
/sbin/subdomain_parser = u
/usr/sbin/genprof = u
/usr/sbin/logprof = u
/usr/lib/YaST2/servers_non_y2/ag_genprof = u
/usr/lib/YaST2/servers_non_y2/ag_logprof = u
# these ones shouln't have their own profiles
/bin/awk = icn
/bin/cat = icn
/bin/chmod = icn
/bin/chown = icn
/bin/cp = icn
/bin/gawk = icn
/bin/grep = icn
/bin/gunzip = icn
/bin/gzip = icn
/bin/kill = icn
/bin/ln = icn
/bin/ls = icn
/bin/mkdir = icn
/bin/mv = icn
/bin/readlink = icn
/bin/rm = icn
/bin/sed = icn
/bin/touch = icn
/sbin/killall5 = icn
/usr/bin/find = icn
/usr/bin/killall = icn
/usr/bin/nice = icn
/usr/bin/perl = icn
/usr/bin/tr = icn
[required_hats]
^.+/apache(|2|2-prefork)$ = DEFAULT_URI HANDLING_UNTRUSTED_INPUT #a
^.+/httpd(|2|2-prefork)$ = DEFAULT_URI HANDLING_UNTRUSTED_INPUT
[defaulthat]
^.+/apache(|2|2-prefork)$ = DEFAULT_URI
^.+/httpd(|2|2-prefork)$ = DEFAULT_URI
[globs]
# /foo/bar/lib/libbaz.so -> /foo/bar/lib/lib*
/lib/lib[^\/]+so[^\/]*$ = /lib/lib*so*
# strip kernel version numbers from kernel module accesses
^/lib/modules/[^\/]+\/ = /lib/modules/*/
# strip pid numbers from /proc accesses
^/proc/\d+/ = /proc/*/
# if it looks like a home directory, glob out the username
^/home/[^\/]+ = /home/*
# if they use any perl modules, grant access to all
^/usr/lib/perl5/.+$ = /usr/lib/perl5/**
# locale foo
^/usr/lib/locale/.+$ = /usr/lib/locale/**
^/usr/share/locale/.+$ = /usr/share/locale/**
# timezone fun
^/usr/share/zoneinfo/.+$ = /usr/share/zoneinfo/**
# /foobar/fonts/baz -> /foobar/fonts/**
/fonts/.+$ = /fonts/**
# turn /foo/bar/baz.8907234 into /foo/bar/baz.*
# BUGBUG - this one looked weird because it would suggest a glob for
# BUGBUG - libfoo.so.5.6.0 that looks like libfoo.so.5.6.*
# \.\d+$ = .*
# some various /etc/security poo -- dunno about these ones...
^/etc/security/_[^\/]+$ = /etc/security/*
^/lib/security/pam_filter/[^\/]+$ = /lib/security/pam_filter/*
^/lib/security/pam_[^\/]+\.so$ = /lib/security/pam_*.so
^/etc/pam.d/[^\/]+$ = /etc/pam.d/*
^/etc/profile.d/[^\/]+\.sh$ = /etc/profile.d/*.sh

View File

@@ -6,10 +6,11 @@ Created on Jun 21, 2013
import sys
import unittest
sys.path.append('../lib')
import severity
sys.path.append('../')
sys.path.append('../apparmor')
import AppArmor.severity as severity
from AppArmor.common import AppArmorException
class Test(unittest.TestCase):
def testInvalid(self):
@@ -17,11 +18,11 @@ class Test(unittest.TestCase):
rank = s.rank('/dev/doublehit', 'i')
self.assertEqual(rank, 10, 'Wrong')
try:
broken = severity.Severity('severity_broken.db')
rank = s.rank('CAP_UNKOWN')
rank = s.rank('CAP_K*')
except ValueError:
broken = severity.Severity('severity_broken.db')
except AppArmorException:
pass
rank = s.rank('CAP_UNKOWN')
rank = s.rank('CAP_K*')
def testRank_Test(self):
z = severity.Severity()

5
apparmor/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
'''
Created on Jun 27, 2013
@author: kshitij
'''

173
apparmor/aa.py Normal file
View File

@@ -0,0 +1,173 @@
#mv $0 bed/ now
#Line 470
import os
import re
import sys
import apparmor.config
import apparmor.severity
import LibAppArmor
from AppArmor.common import AppArmorException, error, debug, msg, open_file_read, valid_path
DEBUGGING = False
CONFDIR = '/etc/apparmor'
running_under_genprof = False
unimplemented_warning = False
# The operating mode: yast or text, text by default
UI_mode = 'text'
# The database for severity
sev_db = None
# The file to read log messages from
### Was our
filename = None
cfg = None
repo_cfg = None
parser = None
ldd = None
logger = None
profile_dir = None
extra_profile_dir = None
### end our
# To keep track of previously included profile fragments
include = dict()
existing_profiles = dict()
seen_events = 0 # was our
# To store the globs entered by users so they can be provided again
user_globs = []
## Variables used under logprof
### Were our
t = dict()
transitions = dict()
aa = dict() # Profiles originally in sd, replace by aa
original_aa = dict()
extras = dict() # Inactive profiles from extras
### end our
log = []
pid = None
seen = dir()
profile_changes = dict()
prelog = dict()
log = dict()
changed = dict()
created = []
helpers = dict() # Preserve this between passes # was our
### logprof ends
filelist = dict() # File level variables and stuff in config files
AA_MAY_EXEC = 1
AA_MAY_WRITE = 2
AA_MAY_READ = 4
AA_MAY_APPEND = 8
AA_MAY_LINK = 16
AA_MAY_LOCK = 32
AA_EXEC_MMAP = 64
AA_EXEC_UNSAFE = 128
AA_EXEC_INHERIT = 256
AA_EXEC_UNCONFINED = 512
AA_EXEC_PROFILE = 1024
AA_EXEC_CHILD = 2048
AA_EXEC_NT = 4096
AA_LINK_SUBSET = 8192
AA_OTHER_SHIFT = 14
AA_USER_MASK = 16384 - 1
AA_EXEC_TYPE = (AA_MAY_EXEC | AA_EXEC_UNSAFE | AA_EXEC_INHERIT |
AA_EXEC_UNCONFINED | AA_EXEC_PROFILE | AA_EXEC_CHILD | AA_EXEC_NT)
ALL_AA_EXEC_TYPE = AA_EXEC_TYPE # The same value
# Modes and their values
MODE_HASH = {'x': AA_MAY_EXEC, 'X': AA_MAY_EXEC,
'w': AA_MAY_WRITE, 'W': AA_MAY_WRITE,
'r': AA_MAY_READ, 'R': AA_MAY_READ,
'a': AA_MAY_APPEND, 'A': AA_MAY_APPEND,
'l': AA_MAY_LINK, 'L': AA_MAY_LINK,
'k': AA_MAY_LOCK, 'K': AA_MAY_LOCK,
'm': AA_EXEC_MMAP, 'M': AA_EXEC_MMAP,
'i': AA_EXEC_INHERIT, 'I': AA_EXEC_INHERIT,
'u': AA_EXEC_UNCONFINED + AA_EXEC_UNSAFE, # Unconfined + Unsafe
'U': AA_EXEC_UNCONFINED,
'p': AA_EXEC_PROFILE + AA_EXEC_UNSAFE, # Profile + unsafe
'P': AA_EXEC_PROFILE,
'c': AA_EXEC_CHILD + AA_EXEC_UNSAFE, # Child + Unsafe
'C': AA_EXEC_CHILD,
'n': AA_EXEC_NT + AA_EXEC_UNSAFE,
'N': AA_EXEC_NT
}
# Used by netdomain to identify the operation types
OPERATION_TYPES = {
# New socket names
'create': 'net',
'post_create': 'net',
'bind': 'net',
'connect': 'net',
'listen': 'net',
'accept': 'net',
'sendmsg': 'net',
'recvmsg': 'net',
'getsockname': 'net',
'getpeername': 'net',
'getsockopt': 'net',
'setsockopt': 'net',
'sock_shutdown': 'net'
}
ARROWS = {'A': 'UP', 'B': 'DOWN', 'C': 'RIGHT', 'D': 'LEFT'}
def opt_type(operation):
"""Returns the operation type if known, unkown otherwise"""
operation_type = OPERATION_TYPES.get(operation, 'unknown')
return operation_type
def getkey():
"""Returns the pressed key"""
# Used incase of Y or N without pressing enter
## Needs to be done using curses? read a single key
pass
def check_for_apparmor():
"""Finds and returns the mointpoint for apparmor None otherwise"""
filesystem = '/proc/filesystems'
mounts = '/proc/mounts'
support_securityfs = False
aa_mountpoint = None
regex_securityfs = re.compile('^\S+\s+(\S+)\s+securityfs\s')
if valid_path(filesystem):
with open_file_read(filesystem) as f_in:
for line in f_in:
if 'securityfs' in line:
support_securityfs = True
if valid_path(mounts):
with open_file_read(mounts) as f_in:
for line in f_in:
if support_securityfs:
match = regex_securityfs(line)
if match:
mountpoint = match.groups()[0] + '/apparmor'
if valid_path(mountpoint):
aa_mountpoint = mountpoint
# Check if apparmor is actually mounted there
if not valid_path(aa_mountpoint + '/profiles'):
aa_mountpoint = None
return aa_mountpoint
def which(file):
"""Returns the executable fullpath for the file None otherwise"""
env_dirs = os.getenv('PATH').split(':')
for env_dir in env_dirs:
env_path = env_dir + '/' + file
# Test if the path is executable or not
if os.access(env_path, os.X_OK):
return env_path
return None

126
apparmor/common.py Normal file
View File

@@ -0,0 +1,126 @@
from __future__ import print_function
import codecs
import glob
import os
import subprocess
import sys
DEBUGGING = False
#
# Utility classes
#
class AppArmorException(Exception):
'''This class represents AppArmor exceptions'''
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
#
# Utility functions
#
def error(out, exit_code=1, do_exit=True):
'''Print error message and exit'''
try:
print("ERROR: %s" % (out), file=sys.stderr)
except IOError:
pass
if do_exit:
sys.exit(exit_code)
def warn(out):
'''Print warning message'''
try:
print("WARN: %s" % (out), file=sys.stderr)
except IOError:
pass
def msg(out, output=sys.stdout):
'''Print message'''
try:
print("%s" % (out), file=output)
except IOError:
pass
def debug(out):
'''Print debug message'''
global DEBUGGING
if DEBUGGING:
try:
print("DEBUG: %s" % (out), file=sys.stderr)
except IOError:
pass
def cmd(command):
'''Try to execute the given command.'''
debug(command)
try:
sp = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except OSError as ex:
return [127, str(ex)]
if sys.version_info[0] >= 3:
out = sp.communicate()[0].decode('ascii', 'ignore')
else:
out = sp.communicate()[0]
return [sp.returncode, out]
def cmd_pipe(command1, command2):
'''Try to pipe command1 into command2.'''
try:
sp1 = subprocess.Popen(command1, stdout=subprocess.PIPE)
sp2 = subprocess.Popen(command2, stdin=sp1.stdout)
except OSError as ex:
return [127, str(ex)]
if sys.version_info[0] >= 3:
out = sp2.communicate()[0].decode('ascii', 'ignore')
else:
out = sp2.communicate()[0]
return [sp2.returncode, out]
def valid_path(path):
'''Valid path'''
# No relative paths
m = "Invalid path: %s" % (path)
if not path.startswith('/'):
debug("%s (relative)" % (m))
return False
if '"' in path: # We double quote elsewhere
return False
try:
os.path.normpath(path)
except Exception:
debug("%s (could not normalize)" % (m))
return False
return True
def get_directory_contents(path):
'''Find contents of the given directory'''
if not valid_path(path):
return None
files = []
for f in glob.glob(path + "/*"):
files.append(f)
files.sort()
return files
def open_file_read(path):
'''Open specified file read-only'''
try:
orig = codecs.open(path, 'r', "UTF-8")
except Exception:
raise
return orig

247
apparmor/config.py Normal file
View File

@@ -0,0 +1,247 @@
from __future__ import with_statement
import os
import shlex
import shutil
import stat
import sys
import tempfile
if sys.version_info < (3,0):
import ConfigParser as configparser
else:
import configparser
from AppArmor.common import AppArmorException, warn, msg, open_file_read
CONF_DIR = '/etc/apparmor'
CFG = None
REPO_CFG = None
SHELL_FILES = ['easyprof.conf', 'notify.conf', 'parser.conf', 'subdomain.conf']
class Config:
def __init__(self, conf_type):
# The type of config file that'll be read and/or written
self.conf_type = conf_type
self.input_file = None
def new_config(self):
if self.conf_type == 'shell':
config = {'': dict()}
else:
config = configparser.ConfigParser()
return config
def read_config(self, filename):
"""Reads the file and returns a config[section][attribute]=property object"""
# LP: Bug #692406
# Explicitly disabled repository
filepath = CONF_DIR + '/' + filename
self.input_file = filepath
if filename == "repository.conf":
config = dict()
config['repository'] = {'enabled': 'no'}
elif self.conf_type == 'shell':
config = self.read_shell(filepath)
else:
config = configparser.ConfigParser()
# Set the option form to string -prevents forced conversion to lowercase
#config.optionxform = str
if sys.version_info > (3,0):
config.read(filepath)
else:
config.readfp(open_file_read(filepath))
return config
def write_config(self, filename, config):
"""Writes the given config to the specified file"""
filepath = CONF_DIR + '/' + filename
permission_600 = stat.S_IRUSR | stat.S_IWUSR # Owner read and write
try:
# Open a temporary file in the CONF_DIR to write the config file
config_file = tempfile.NamedTemporaryFile('w', prefix='aa_temp', delete=False, dir=CONF_DIR)
if os.path.exists(self.input_file):
# Copy permissions from an existing file to temporary file
shutil.copymode(self.input_file, config_file.name)
else:
# If no existing permission set the file permissions as 0600
os.chmod(config_file.name, permission_600)
if self.conf_type == 'shell':
self.write_shell(filepath, config_file, config)
else:
self.write_configparser(filepath, config_file, config)
#config.write(config_file)
config_file.close()
except IOError:
raise AppArmorException("Unable to write to %s"%filename)
else:
# Replace the target config file with the temporary file
os.rename(config_file.name, filepath)
def find_first_file(self, file_list):
"""Returns name of first matching file None otherwise"""
filename = None
if filename:
for file in file_list.split():
if os.path.isfile(file):
filename = file
break
return filename
def find_first_dir(self, dir_list):
"""Returns name of first matching directory None otherwise"""
dirname = None
if dir_list:
for direc in dir_list.split():
if os.path.isdir(direc):
dirname = direc
break
return dirname
def read_shell(self, filepath):
"""Reads the shell type conf files and returns config[''][option]=value"""
config = {'': dict()}
with open_file_read(filepath) as file:
for line in file:
result = shlex.split(line, True)
# If not a comment of empty line
if result:
# option="value" or option=value type
if '=' in result[0]:
option, value = result[0].split('=')
# option type
else:
option = result[0]
value = None
config[''][option] = value
return config
def write_shell(self, filepath, f_out, config):
"""Writes the config object in shell file format"""
# All the options in the file
options = [key for key in config[''].keys()]
# If a previous file exists modify it keeping the comments
if os.path.exists(self.input_file):
with open_file_read(self.input_file) as f_in:
for line in f_in:
result = shlex.split(line, True)
# If line is not empty or comment
if result:
# If option=value or option="value" type
if '=' in result[0]:
option, value = result[0].split('=')
if '#' in line:
comment = value.split('#', 1)[1]
comment = '#'+comment
else:
comment = ''
# If option exists in the new config file
if option in options:
# If value is different
if value != config[''][option]:
value_new = config[''][option]
if value_new != None:
# Update value
if '"' in line:
value_new = '"' + value_new + '"'
line = option + '=' + value_new + comment + '\n'
else:
# If option changed to option type from option=value type
line = option + comment + '\n'
f_out.write(line)
# Remove from remaining options list
options.remove(option)
else:
# If option type
option = result[0]
value = None
# If option exists in the new config file
if option in options:
# If its no longer option type
if config[''][option] != None:
value = config[''][option]
line = option + '=' + value + '\n'
f_out.write(line)
# Remove from remaining options list
options.remove(option)
else:
# If its empty or comment copy as it is
f_out.write(line)
# If any new options are present
if options:
for option in options:
value = config[''][option]
# option type entry
if value == None:
line = option + '\n'
# option=value type entry
else:
line = option + '=' + value + '\n'
f_out.write(line)
def write_configparser(self, filepath, f_out, config):
"""Writes/updates the given file with given config object"""
# All the sections in the file
sections = config.sections()
write = True
section = None
options = []
# If a previous file exists modify it keeping the comments
if os.path.exists(self.input_file):
with open_file_read(self.input_file) as f_in:
for line in f_in:
# If its a section
if line.lstrip().startswith('['):
# If any options from preceding section remain write them
if options:
for option in options:
line_new = ' ' + option + ' = ' + config[section][option] + '\n'
f_out.write(line_new)
options = []
if section in sections:
# Remove the written section from the list
sections.remove(section)
section = line.strip()[1:-1]
if section in sections:
# enable write for all entries in that section
write = True
options = config.options(section)
# write the section
f_out.write(line)
else:
# disable writing until next valid section
write = False
# If write enabled
elif write:
value = shlex.split(line, True)
# If the line is empty or a comment
if not value:
f_out.write(line)
else:
option, value = line.split('=', 1)
try:
# split any inline comments
value, comment = value.split('#', 1)
comment = '#' + comment
except ValueError:
comment = ''
if option.strip() in options:
if config[section][option.strip()] != value.strip():
value = value.replace(value, config[section][option.strip()])
line = option + '=' + value + comment
f_out.write(line)
options.remove(option.strip())
# If any options remain from the preceding section
if options:
for option in options:
line = ' ' + option + ' = ' + config[section][option] + '\n'
f_out.write(line)
options = []
# If any new sections are present
if section in sections:
sections.remove(section)
for section in sections:
f_out.write('\n['+section+']\n')
options = config.options(section)
for option in options:
line = ' ' + option + ' = ' + config[section][option] + '\n'
f_out.write(line)

View File

@@ -1,5 +1,7 @@
from __future__ import with_statement
import os
import re
from AppArmor.common import AppArmorException, error, debug, open_file_read, warn, msg
class Severity:
def __init__(self, dbname=None, default_rank=10):
@@ -16,7 +18,7 @@ class Severity:
for root, dirs, files in os.walk('/etc/apparmor.d'):
for file in files:
try:
with open(os.path.join(root, file), 'r') as f:
with open_file_read(os.path.join(root, file)) as f:
for line in f:
line.strip()
# Expected format is @{Variable} = value1 value2 ..
@@ -24,17 +26,23 @@ class Severity:
line = line.strip()
if '+=' in line:
line = line.split('+=')
try:
self.severity['VARIABLES'][line[0]] += [i.strip('"') for i in line[1].split()]
except KeyError:
raise AppArmorException("Variable %s was not previously declared, but is being assigned additional values" % line[0])
else:
line = line.split('=')
self.severity['VARIABLES'][line[0]] = self.severity['VARIABLES'].get(line[0], []) + [i.strip('"') for i in line[1].split()]
if line[0] in self.severity['VARIABLES'].keys():
raise AppArmorException("Variable %s was previously declared" % line[0])
self.severity['VARIABLES'][line[0]] = [i.strip('"') for i in line[1].split()]
except IOError:
raise IOError("unable to open file: %s"%file)
raise AppArmorException("unable to open file: %s" % file)
if not dbname:
return None
try:
database = open(dbname, 'r')
database = open_file_read(dbname)#open(dbname, 'r')
except IOError:
raise IOError("Could not open severity database %s"%dbname)
raise AppArmorException("Could not open severity database %s" % dbname)
for line in database:
line = line.strip() # or only rstrip and lstrip?
if line == '' or line.startswith('#') :
@@ -44,10 +52,12 @@ class Severity:
path, read, write, execute = line.split()
read, write, execute = int(read), int(write), int(execute)
except ValueError:
raise ValueError("Insufficient values for permissions in line: %s\nin File: %s"%(line, dbname))
msg("\nFile: %s" % dbname)
raise AppArmorException("Insufficient values for permissions in line: %s" % (line))
else:
if read not in range(0,11) or write not in range(0,11) or execute not in range(0,11):
raise ValueError("Inappropriate values for permissions in line: %s\nin File: %s"%(line, dbname))
msg("\nFile:"%(dbname))
raise AppArmorException("Inappropriate values for permissions in line: %s" % line)
path = path.lstrip('/')
if '*' not in path:
self.severity['FILES'][path] = {'r': read, 'w': write, 'x': execute}
@@ -68,13 +78,16 @@ class Severity:
resource, severity = line.split()
severity = int(severity)
except ValueError:
raise ValueError("No severity value present in line: %s\nin File: %s"%(line, dbname))
msg("\nFile: %s" % dbname)
raise AppArmorException("No severity value present in line: %s" % (line))
else:
if severity not in range(0,11):
raise ValueError("Inappropriate severity value present in line: %s\nin File: %s"%(line, dbname))
msg("\nFile: %s" % dbname)
raise AppArmorException("Inappropriate severity value present in line: %s" % (line))
self.severity['CAPABILITIES'][resource] = severity
else:
raise ValueError("Unexpected database line: %s \nin File: %s"%(line,dbname))
msg("\nFile: %s" % dbname)
raise AppArmorException("Unexpected database line: %s" % (line))
database.close()
def convert_regexp(self, path):
@@ -83,7 +96,7 @@ class Severity:
internal_glob = '__KJHDKVZH_AAPROF_INTERNAL_GLOB_SVCUZDGZID__'
regex = path
for character in ['.', '+', '[', ']']: # Escape the regex symbols
regex = regex.replace(character, "\%s"%character)
regex = regex.replace(character, "\%s" % character)
# Convert the ** to regex
regex = regex.replace('**', '.'+internal_glob)
# Convert the * to regex
@@ -101,6 +114,7 @@ class Severity:
if resource in self.severity['CAPABILITIES'].keys():
return self.severity['CAPABILITIES'][resource]
# raise ValueError("unexpected capability rank input: %s"%resource)
warn("unknown capability: %s" % resource)
return self.severity['DEFAULT_RANK']
@@ -158,7 +172,7 @@ class Severity:
elif resource[0:4] == 'CAP_': # capability resource
return self.handle_capability(resource)
else:
raise ValueError("Unexpected rank input: %s"%resource)
raise AppArmorException("Unexpected rank input: %s" % resource)
def handle_variable_rank(self, resource, mode):
"""Returns the max possible rank for file resources containing variables"""
@@ -176,7 +190,7 @@ class Severity:
rank = rank_new
return rank
else:
print(resource)
#print(resource)
#print(self.handle_file(resource, mode))
return self.handle_file(resource, mode)

View File

@@ -1 +0,0 @@
#!/usr/bin/python3

View File

@@ -1,219 +0,0 @@
import configparser
import os
import shlex
import shutil
import stat
import tempfile
confdir = '/etc/apparmor'
cfg = None
repo_cfg = None
shell_files = ['easyprof.conf', 'notify.conf', 'parser.conf', 'subdomain.conf']
def read_config(filename, conf_type=None):
"""Reads the file and returns a config[section][attribute]=property object"""
# LP: Bug #692406
# Explicitly disabled repository
filepath = confdir + '/' + filename
if filename == "repository.conf":
config = dict()
config['repository'] = {'enabled': 'no'}
elif filename in shell_files or conf_type == 'shell':
config = read_shell(filepath)
else:
config = configparser.ConfigParser()
config.optionxform = str
config.read(filepath)
return config
def write_config(filename, config, conf_type=None):
"""Writes the given config to the specified file"""
filepath = confdir + '/' + filename
permission_600 = stat.S_IRUSR | stat.S_IWUSR # Owner read and write
try:
# Open a temporary file in the confdir to write the config file
config_file = tempfile.NamedTemporaryFile('w', prefix='aa_temp', delete=False, dir=confdir)
if os.path.exists(filepath):
# Copy permissions from an existing file to temporary file
shutil.copymode(filepath, config_file.name)
else:
# If no existing permission set the file permissions as 0600
os.chmod(config_file.name, permission_600)
write_shell(filepath, config_file, config)
if filename in shell_files or conf_type == 'shell':
write_shell(filepath, config_file, config)
else:
write_configparser(filepath, config_file, config)
#config.write(config_file)
config_file.close()
except IOError:
raise IOError("Unable to write to %s"%filename)
else:
# Replace the target config file with the temporary file
os.rename(config_file.name, filepath)
def find_first_file(file_list):
"""Returns name of first matching file None otherwise"""
# I don't understand why it searches the CWD, maybe I'll find out about it in some module
filename = None
if len(file_list):
for file in file_list.split():
if os.path.isfile(file):
filename = file
break
return filename
def find_first_dir(dir_list):
"""Returns name of first matching directory None otherwise"""
dirname = None
if (len(dir_list)):
for direc in dir_list.split():
if os.path.isdir(direc):
dirname = direc
break
return dirname
def read_shell(filepath):
"""Reads the shell type conf files and returns config[''][option]=value"""
config = {'': dict()}
with open(filepath, 'r') as file:
for line in file:
result = shlex.split(line, True)
# If not a comment of empty line
if result != []:
# option="value" or option=value type
if '=' in result[0]:
option, value = result[0].split('=')
# option type
else:
option = result[0]
value = None
config[''][option] = value
return config
def write_shell(filepath, f_out, config):
"""Writes the config object in shell file format"""
# All the options in the file
options = [key for key in config[''].keys()]
# If a previous file exists modify it keeping the comments
if os.path.exists(filepath):
with open(filepath, 'r') as f_in:
for line in f_in:
result = shlex.split(line, True)
# If line is not empty or comment
if result != []:
# If option=value or option="value" type
if '=' in result[0]:
option, value = result[0].split('=')
# If option exists in the new config file
if option in options:
# If value is different
if value != config[''][option]:
value_new = config[''][option]
if value_new != None:
# Update value
if '"' in line:
value_new = '"' + value_new + '"'
line = option + '=' + value_new + '\n'
else:
# If option changed to option type from option=value type
line = option + '\n'
f_out.write(line)
# Remove from remaining options list
options.remove(option)
else:
# If option type
option = result[0]
value = None
# If option exists in the new config file
if option in options:
# If its no longer option type
if config[''][option] != None:
value = config[''][option]
line = option + '=' + value + '\n'
f_out.write(line)
# Remove from remaining options list
options.remove(option)
else:
# If its empty or comment copy as it is
f_out.write(line)
# If any new options are present
if options != []:
for option in options:
value = config[''][option]
# option type entry
if value == None:
line = option + '\n'
# option=value type entry
else:
line = option + '=' + value + '\n'
f_out.write(line)
def write_configparser(filepath, f_out, config):
# All the sections in the file
sections = config.sections()
write = True
section = None
options = []
# If a previous file exists modify it keeping the comments
if os.path.exists(filepath):
with open(filepath, 'r') as f_in:
for line in f_in:
# If its a section
if line.lstrip().startswith('['):
# If any options from preceding section remain write them
if options != []:
for option in options:
line_new = ' ' + option + ' = ' + config[section][option] + '\n'
f_out.write(line_new)
options = []
if section in sections:
# Remove the written section from the list
sections.remove(section)
section = line.strip()[1:-1]
if section in sections:
# enable write for all entries in that section
write = True
options = config.options(section)
# write the section
f_out.write(line)
else:
# disable writing until next valid section
write = False
# If write enabled
elif write:
value = shlex.split(line, True)
# If the line is empty or a comment
if value == []:
f_out.write(line)
else:
option, value = line.split('=', 1)
try:
# split any inline comments
value, comment = value.split('#', 1)
comment = '#' + comment
except ValueError:
comment = ''
if option.strip() in options:
if config[section][option.strip()] != value.strip():
value = value.replace(value, config[section][option.strip()])
line = option + '=' + value + comment
f_out.write(line)
options.remove(option.strip())
# If any options remain from the preceding section
if options != []:
for option in options:
line = ' ' + option + ' = ' + config[section][option] + '\n'
f_out.write(line)
options = []
# If any new sections are present
if section in sections:
sections.remove(section)
for section in sections:
f_out.write('\n['+section+']\n')
options = config.options(section)
for option in options:
line = ' ' + option + ' = ' + config[section][option] + '\n'
f_out.write(line)