2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-22 18:17:09 +00:00
apparmor/parser/tst/testlib.py
Steve Beattie b3bf36175d parser - rewrite caching tests in python unittest
This patch rewrites the caching test in python, using python's unittest
framework. It has been used with python 2.7 and python 3.3; python2.6
may have issues. It covers the tests in the existing caching.sh
test script (with the exception of the test that checks for when the
parser in $PATH is newer), as well as adding additional tests that
more extensively cover using a cache in an alternate location from
basedir. It also adds simple tests for the --create-cache-dir option
(along with that option's interaction with the alt-cache option).

(Some further work to be done is listed under TODO.)

Patch history:
  v1: - initial version
  v2: - create template base class
      - add keep_on_fail() decorator to keep temporary test files
        around after a test fails
      - don't dump raw cache file to failure output in
        test_cache_writing_updates_cache_file()
      - push run_cmd into template class
      - create run_cmd_check wrapper to run_cmd that adds an assertion
        check based on whether return code matches the expected rc
        (the valgrind tests only want to verify that the rc is not a
        specific set of values, hence the separate wrapper function)
      - similarly, add a check to run_cmd_check for verifying the output
        contains a specific string, also simplifying many of the caching
        tests.
      - create testlib.write_file() to simplify writing file

Signed-off-by: Steve Beattie <steve@nxnw.org>
Acked-by: Christian Boltz <apparmor@cboltz.de>
2013-10-15 17:10:12 -07:00

187 lines
5.8 KiB
Python

#!/usr/bin/env python
# ------------------------------------------------------------------
#
# Copyright (C) 2013 Canonical Ltd.
# Author: Steve Beattie <steve@nxnw.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License published by the Free Software Foundation.
#
# ------------------------------------------------------------------
import os
import shutil
import signal
import subprocess
import tempfile
import time
import unittest
TIMEOUT_ERROR_CODE = 152
DEFAULT_PARSER = '../apparmor_parser'
# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
# This is needed so that the subprocesses that produce endless output
# actually quit when the reader goes away.
def subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not
# what non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
class AATestTemplate(unittest.TestCase):
'''Stub class for use by test scripts'''
debug = False
do_cleanup = True
def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
stdin=None, timeout=120, expected_rc=0, expected_string=None):
'''Wrapper around run_cmd that checks the rc code against
expected_rc and for expected strings in the output if
passed. The valgrind tests generally don't care what the
rc is as long as it's not a specific set of return codes,
so can't push the check directly into run_cmd().'''
rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
self.assertEqual(rc, expected_rc, "Got return code %d, expected %d\nCommand run: %s\nOutput: %s" % (rc, expected_rc, (' '.join(command)), report))
if expected_string:
self.assertIn(expected_string, report, 'Expected message "%s", got: \n%s' % (expected_string, report))
return report
def run_cmd(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
stdin=None, timeout=120):
'''Try to execute given command (array) and return its stdout, or
return a textual error if it failed.'''
if self.debug:
print('\n===> Running command: \'%s\'' % (' '.join(command)))
try:
sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
close_fds=True, preexec_fn=subprocess_setup)
except OSError as e:
return [127, str(e)]
timeout_communicate = TimeoutFunction(sp.communicate, timeout)
out, outerr = (None, None)
try:
out, outerr = timeout_communicate(input)
rc = sp.returncode
except TimeoutFunctionException as e:
sp.terminate()
outerr = b'test timed out, killed'
rc = TIMEOUT_ERROR_CODE
# Handle redirection of stdout
if out is None:
out = b''
# Handle redirection of stderr
if outerr is None:
outerr = b''
report = out.decode('utf-8') + outerr.decode('utf-8')
return [rc, report]
# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
class TimeoutFunctionException(Exception):
"""Exception to raise on a timeout"""
pass
class TimeoutFunction:
def __init__(self, function, timeout):
self.timeout = timeout
self.function = function
def handle_timeout(self, signum, frame):
raise TimeoutFunctionException()
def __call__(self, *args, **kwargs):
old = signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.timeout)
try:
result = self.function(*args, **kwargs)
finally:
signal.signal(signal.SIGALRM, old)
signal.alarm(0)
return result
def filesystem_time_resolution():
'''detect whether the filesystem stores sub 1 second timestamps'''
default_diff = 0.1
result = (True, default_diff)
tmp_dir = tempfile.mkdtemp(prefix='aa-caching-nanostamp-')
try:
last_stamp = None
for i in range(10):
s = None
with open(os.path.join(tmp_dir, 'test.%d' % i), 'w+') as f:
s = os.fstat(f.fileno())
if (s.st_mtime == last_stamp):
print('\n===> WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
result = (False, 1.0)
break
last_stamp = s.st_mtime
time.sleep(default_diff)
except:
pass
finally:
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
return result
def read_features_dir(path):
result = ''
if not os.path.exists(path) or not os.path.isdir(path):
return result
for name in os.listdir(path):
entry = os.path.join(path, name)
result += '%s {' % name
if os.path.isfile(entry):
with open(entry, 'r') as f:
# don't need extra '\n' here as features file contains it
result += '%s' % (f.read())
elif os.path.isdir(entry):
result += '%s' % (read_features_dir(entry))
result += '}\n'
return result
def touch(path):
return os.utime(path, None)
def write_file(path, contents):
'''write contents to path'''
with open(path, 'w+') as f:
f.write(contents)
def keep_on_fail(unittest_func):
'''wrapping function for unittest testcases to detect failure
and leave behind test files in tearDown(); to be used as a
decorator'''
def new_unittest_func(self):
try:
unittest_func(self)
except Exception:
self.do_cleanup = False
raise
return new_unittest_func