2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-09-02 15:25:27 +00:00

adjust sandbox code:

- for python3
- to add xpra support
- refactoring
- cleanups
This commit is contained in:
Jamie Strandboge
2012-08-23 20:49:12 -05:00
3 changed files with 210 additions and 90 deletions

View File

@@ -33,5 +33,5 @@ if __name__ == "__main__":
else: else:
rc, report = apparmor.sandbox.run_sandbox(args, opt) rc, report = apparmor.sandbox.run_sandbox(args, opt)
print report apparmor.common.msg(report)
sys.exit(rc) sys.exit(rc)

View File

@@ -8,11 +8,11 @@
# #
# ------------------------------------------------------------------ # ------------------------------------------------------------------
from __future__ import print_function
import subprocess import subprocess
import sys import sys
DEBUGGING = False DEBUGGING = False
DEBUGGING = True
# #
# Utility classes # Utility classes
@@ -31,7 +31,7 @@ class AppArmorException(Exception):
def error(out, exit_code=1, do_exit=True): def error(out, exit_code=1, do_exit=True):
'''Print error message and exit''' '''Print error message and exit'''
try: try:
print >> sys.stderr, "ERROR: %s" % (out) print("ERROR: %s" % (out), file=sys.stderr)
except IOError: except IOError:
pass pass
@@ -41,14 +41,14 @@ def error(out, exit_code=1, do_exit=True):
def warn(out): def warn(out):
'''Print warning message''' '''Print warning message'''
try: try:
print >> sys.stderr, "WARN: %s" % (out) print("WARN: %s" % (out), file=sys.stderr)
except IOError: except IOError:
pass pass
def msg(out, output=sys.stdout): def msg(out, output=sys.stdout):
'''Print message''' '''Print message'''
try: try:
print >> output, "%s" % (out) print("%s" % (out), file=sys.stdout)
except IOError: except IOError:
pass pass
@@ -57,7 +57,7 @@ def debug(out):
global DEBUGGING global DEBUGGING
if DEBUGGING: if DEBUGGING:
try: try:
print >> sys.stderr, "DEBUG: %s" % (out) print("DEBUG: %s" % (out), file=sys.stderr)
except IOError: except IOError:
pass pass
@@ -67,20 +67,29 @@ def cmd(command):
try: try:
sp = subprocess.Popen(command, stdout=subprocess.PIPE, sp = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
except OSError, ex: except OSError as ex:
return [127, str(ex)] return [127, str(ex)]
if sys.version_info[0] >= 3:
out = sp.communicate()[0].decode('ascii', 'ignore')
else:
out = sp.communicate()[0] out = sp.communicate()[0]
return [sp.returncode, out] return [sp.returncode, out]
def cmd_pipe(command1, command2): def cmd_pipe(command1, command2):
'''Try to pipe command1 into command2.''' '''Try to pipe command1 into command2.'''
try: try:
sp1 = subprocess.Popen(command1, stdout=subprocess.PIPE) sp1 = subprocess.Popen(command1, stdout=subprocess.PIPE)
sp2 = subprocess.Popen(command2, stdin=sp1.stdout) sp2 = subprocess.Popen(command2, stdin=sp1.stdout)
except OSError, ex: except OSError as ex:
return [127, str(ex)] return [127, str(ex)]
if sys.version_info[0] >= 3:
out = sp2.communicate()[0].decode('ascii', 'ignore')
else:
out = sp2.communicate()[0] out = sp2.communicate()[0]
return [sp2.returncode, out] return [sp2.returncode, out]

View File

@@ -13,27 +13,30 @@ import apparmor.easyprof
import optparse import optparse
import os import os
import pwd import pwd
import re
import sys import sys
import tempfile import tempfile
import time import time
DEBUGGING = False
def check_requirements(binary): def check_requirements(binary):
'''Verify necessary software is installed''' '''Verify necessary software is installed'''
exes = ['Xephyr', 'matchbox-window-manager', binary] exes = ['xset', # for detecting free X display
'aa-easyprof', # for templates
'aa-exec', # for changing profile
'sudo', # eventually get rid of this
binary]
for e in exes: for e in exes:
debug("Searching for '%s'" % e) debug("Searching for '%s'" % e)
rc, report = cmd(['which', e]) rc, report = cmd(['which', e])
if rc != 0: if rc != 0:
error("Could not find '%s'" % e, do_exit=False) error("Could not find '%s'" % e, do_exit=False)
return False return False
return True return True
def parse_args(args=None, parser=None): def parse_args(args=None, parser=None):
'''Parse arguments''' '''Parse arguments'''
global DEBUGGING
if parser == None: if parser == None:
parser = optparse.OptionParser() parser = optparse.OptionParser()
@@ -42,6 +45,10 @@ def parse_args(args=None, parser=None):
default=False, default=False,
help='Run in isolated X server', help='Run in isolated X server',
action='store_true') action='store_true')
parser.add_option('--xserver',
dest='xserver',
default='xpra',
help='Nested X server to use (default is xpra)')
parser.add_option('-d', '--debug', parser.add_option('-d', '--debug',
dest='debug', dest='debug',
default=False, default=False,
@@ -53,15 +60,25 @@ def parse_args(args=None, parser=None):
help='Resolution for X application') help='Resolution for X application')
(my_opt, my_args) = parser.parse_args() (my_opt, my_args) = parser.parse_args()
if my_opt.debug: if my_opt.debug == True:
DEBUGGING = True apparmor.common.DEBUGGING = True
if my_opt.template == "default":
if my_opt.withx:
if my_opt.xserver.lower() != 'xpra' and \
my_opt.xserver.lower() != 'xephyr':
error("Invalid server '%s'. Use 'xpra' or 'xephyr'" % \
my_opt.xserver)
my_opt.template = "sandbox-x"
else:
my_opt.template = "sandbox"
return (my_opt, my_args) return (my_opt, my_args)
def gen_policy_name(binary): def gen_policy_name(binary):
'''Generate a temporary policy based on the binary name''' '''Generate a temporary policy based on the binary name'''
# TODO: this may not be good enough return "sandbox-%s%s" % (pwd.getpwuid(os.getuid())[0],
return "sandbox-%s-%s" % (pwd.getpwuid(os.getuid())[0], re.sub(r'/', '_', binary))
os.path.basename(binary))
def aa_exec(command, opt): def aa_exec(command, opt):
'''Execute binary under specified policy''' '''Execute binary under specified policy'''
@@ -80,8 +97,12 @@ def aa_exec(command, opt):
# TODO: get rid of sudo # TODO: get rid of sudo
tmp = tempfile.NamedTemporaryFile(prefix = '%s-' % policy_name) tmp = tempfile.NamedTemporaryFile(prefix = '%s-' % policy_name)
if sys.version_info[0] >= 3:
tmp.write(bytes(policy, 'utf-8'))
else:
tmp.write(policy) tmp.write(policy)
tmp.flush() tmp.flush()
debug("using '%s' template" % opt.template) debug("using '%s' template" % opt.template)
rc, report = cmd(['sudo', 'apparmor_parser', '-r', tmp.name]) rc, report = cmd(['sudo', 'apparmor_parser', '-r', tmp.name])
if rc != 0: if rc != 0:
@@ -91,26 +112,61 @@ def aa_exec(command, opt):
rc, report = cmd(args) rc, report = cmd(args)
return rc, report return rc, report
def find_free_x_display():
# TODO: detect/track and get an available display
x_display = ":1"
return x_display
def run_sandbox(command, opt): def run_sandbox(command, opt):
'''Run application''' '''Run application'''
# aa-exec # aa-exec
#opt.template = "sandbox-x"
rc, report = aa_exec(command, opt) rc, report = aa_exec(command, opt)
return rc, report return rc, report
def run_xsandbox(command, opt): class SandboxXserver():
'''Run X application in a sandbox''' def __init__(self, resolution, title):
# Find a display to run on self.resolution = resolution
x_display = find_free_x_display() self.title = title
self.pids = []
self.find_free_x_display()
debug (os.environ["DISPLAY"]) # TODO: for now, drop Unity's globalmenu proxy since it doesn't work
# right in the application.
os.environ["UBUNTU_MENUPROXY"] = ""
# first, start X def find_free_x_display(self):
'''Find a free X display'''
display = ""
current = os.environ["DISPLAY"]
for i in range(1,257): # TODO: this puts an artificial limit of 256
# sandboxed applications
tmp = ":%d" % i
os.environ["DISPLAY"] = tmp
rc, report = cmd(['xset', '-q'])
if rc != 0:
display = tmp
break
os.environ["DISPLAY"] = current
if display == "":
raise AppArmorException("Could not find available X display")
self.display = display
def cleanup(self):
'''Cleanup our forked pids, etc'''
# kill server now. It should've terminated, but be sure
for pid in self.pids:
os.kill(pid, 15)
os.waitpid(pid, 0)
def start(self):
'''start() should be overridden'''
class SandboxXephyr(SandboxXserver):
def start(self):
for e in ['Xephyr', 'matchbox-window-manager']:
debug("Searching for '%s'" % e)
rc, report = cmd(['which', e])
if rc != 0:
raise AppArmorException("Could not find '%s'" % e)
'''Start a Xephyr server'''
listener_x = os.fork() listener_x = os.fork()
if listener_x == 0: if listener_x == 0:
# TODO: break into config file? Which are needed? # TODO: break into config file? Which are needed?
@@ -127,58 +183,113 @@ def run_xsandbox(command, opt):
] ]
x_args = ['-nolisten', 'tcp', x_args = ['-nolisten', 'tcp',
'-screen', opt.resolution, '-screen', self.resolution,
'-br', # black background '-br', # black background
'-reset', # reset after last client exists '-reset', # reset after last client exists
'-terminate', # terminate at server reset '-terminate', # terminate at server reset
'-title', command[0], '-title', self.title,
] + x_exts + x_extra_args ] + x_exts + x_extra_args
args = ['/usr/bin/Xephyr'] + x_args + [x_display] args = ['/usr/bin/Xephyr'] + x_args + [self.display]
debug(" ".join(args)) debug(" ".join(args))
sys.stderr.flush() sys.stderr.flush()
os.execv(args[0], args) os.execv(args[0], args)
sys.exit(0) sys.exit(0)
self.pids.append(listener_x)
# save environment time.sleep(1) # FIXME: detect if running
old_display = os.environ["DISPLAY"]
old_cwd = os.getcwd()
# update environment
os.environ["DISPLAY"] = x_display
debug("DISPLAY is now '%s'" % os.environ["DISPLAY"])
time.sleep(0.2) # FIXME: detect if running
# Next, start the window manager # Next, start the window manager
sys.stdout.flush() sys.stdout.flush()
os.chdir(os.environ["HOME"]) os.chdir(os.environ["HOME"])
listener_wm = os.fork() listener_wm = os.fork()
if listener_wm == 0: if listener_wm == 0:
# update environment
os.environ["DISPLAY"] = self.display
debug("DISPLAY is now '%s'" % os.environ["DISPLAY"])
args = ['/usr/bin/matchbox-window-manager', '-use_titlebar', 'no'] args = ['/usr/bin/matchbox-window-manager', '-use_titlebar', 'no']
debug(" ".join(args)) debug(" ".join(args))
sys.stderr.flush() sys.stderr.flush()
os.execv(args[0], args) os.execv(args[0], args)
sys.exit(0) sys.exit(0)
time.sleep(0.2) # FIXME: detect if running self.pids.append(listener_wm)
time.sleep(1) # FIXME: detect if running
class SandboxXpra(SandboxXserver):
def cleanup(self):
cmd(['xpra', 'stop', self.display])
SandboxXserver.cleanup(self)
def start(self):
for e in ['xpra']:
debug("Searching for '%s'" % e)
rc, report = cmd(['which', e])
if rc != 0:
raise AppArmorException("Could not find '%s'" % e)
listener_x = os.fork()
if listener_x == 0:
x_args = ['--no-daemon',
#'--no-mmap', # for security?
'--no-clipboard',
'--no-pulseaudio']
args = ['/usr/bin/xpra', 'start', self.display] + x_args
debug(" ".join(args))
cmd(args)
#sys.stderr.flush()
#os.execv(args[0], args)
sys.exit(0)
self.pids.append(listener_x)
time.sleep(2) # FIXME: detect if running
# Next, attach to xpra
sys.stdout.flush()
os.chdir(os.environ["HOME"])
listener_attach = os.fork()
if listener_attach == 0:
args = ['/usr/bin/xpra', 'attach', self.display,
'--title=%s' % self.title]
debug(" ".join(args))
cmd(args)
#sys.stderr.flush()
#os.execv(args[0], args)
sys.exit(0)
self.pids.append(listener_attach)
def run_xsandbox(command, opt):
'''Run X application in a sandbox'''
# save environment
old_display = os.environ["DISPLAY"]
debug ("DISPLAY=%s" % old_display)
old_cwd = os.getcwd()
# first, start X
if opt.xserver.lower() == "xephyr":
x = SandboxXephyr(opt.resolution, command[0])
else:
x = SandboxXpra(opt.resolution, command[0])
x.start()
# update environment
os.environ["DISPLAY"] = x.display
debug("DISPLAY is now '%s'" % os.environ["DISPLAY"])
# aa-exec # aa-exec
#opt.template = "sandbox-x" try:
rc, report = aa_exec(command, opt) rc, report = aa_exec(command, opt)
except:
x.cleanup()
raise
x.cleanup()
# reset environment # reset environment
os.chdir(old_cwd)
os.environ["DISPLAY"] = old_display os.environ["DISPLAY"] = old_display
debug("DISPLAY is now '%s'" % os.environ["DISPLAY"]) debug("DISPLAY is now '%s'" % os.environ["DISPLAY"])
os.chdir(old_cwd)
# kill server now. It should've terminated, but be sure
cmd(['kill', '-15', "%d" % listener_wm])
os.kill(listener_wm, 15)
os.waitpid(listener_wm, 0)
cmd(['kill', '-15', "%d" % listener_x])
os.kill(listener_x, 15)
os.waitpid(listener_x, 0)
return rc, report return rc, report