2
0
mirror of https://gitlab.com/apparmor/apparmor synced 2025-08-22 10:07:12 +00:00

Merge add userspace support for io_uring mediation

```
io_uring rules have the following format:

io_uring [<access_mode>] [<label>],
access_mode := 'sqpoll'|'override_creds'
label := 'label' '=' <target label>
```

You can use the following kernel tree with the io_uring mediation patch to test this feature https://gitlab.com/georgiag/apparmor-kernel/-/commits/io_uring

MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/993
Approved-by: John Johansen <john@jjmx.net>
Merged-by: John Johansen <john@jjmx.net>
This commit is contained in:
John Johansen 2023-06-29 21:38:02 +00:00
commit fef3eb3693
45 changed files with 1150 additions and 7 deletions

2
.gitignore vendored
View File

@ -61,6 +61,7 @@ parser/ptrace.o
parser/rule.o
parser/signal.o
parser/userns.o
parser/io_uring.o
parser/*.7
parser/*.5
parser/*.8
@ -256,6 +257,7 @@ tests/regression/apparmor/fd_inheritance
tests/regression/apparmor/fd_inheritor
tests/regression/apparmor/fork
tests/regression/apparmor/introspect
tests/regression/apparmor/io_uring
tests/regression/apparmor/link
tests/regression/apparmor/link_subset
tests/regression/apparmor/mkdir

View File

@ -172,6 +172,7 @@ key_fstype "fstype"
key_flags "flags"
key_srcname "srcname"
key_class "class"
key_tcontext "tcontext"
audit "audit"
/* network addrs */
@ -327,6 +328,7 @@ yy_flex_debug = 0;
{key_peer_profile} { BEGIN(safe_string); return(TOK_KEY_PEER_PROFILE); }
{key_label} { BEGIN(safe_string); return(TOK_KEY_LABEL); }
{key_peer_label} { BEGIN(safe_string); return(TOK_KEY_PEER_LABEL); }
{key_tcontext} { BEGIN(safe_string); return(TOK_KEY_PEER_LABEL); }
{key_family} { return(TOK_KEY_FAMILY); }
{key_sock_type} { return(TOK_KEY_SOCK_TYPE); }
{key_protocol} { return(TOK_KEY_PROTOCOL); }

View File

@ -0,0 +1 @@
[ 4584.703379] audit: type=1400 audit(1680266735.359:69): apparmor="DENIED" operation="uring_sqpoll" class="io_uring" profile="/root/apparmor/tests/regression/apparmor/io_uring" pid=1320 comm="io_uring" requested="sqpoll" denied="sqpoll"

View File

@ -0,0 +1,13 @@
START
File: testcase_io_uring_01.in
Event type: AA_RECORD_DENIED
Audit ID: 1680266735.359:69
Operation: uring_sqpoll
Mask: sqpoll
Denied Mask: sqpoll
Profile: /root/apparmor/tests/regression/apparmor/io_uring
Command: io_uring
PID: 1320
Class: io_uring
Epoch: 1680266735
Audit subid: 69

View File

@ -0,0 +1,4 @@
/root/apparmor/tests/regression/apparmor/io_uring {
io_uring sqpoll,
}

View File

@ -0,0 +1 @@
[ 4584.491076] audit: type=1400 audit(1680266735.147:63): apparmor="DENIED" operation="uring_override" class="io_uring" profile="/root/apparmor/tests/regression/apparmor/io_uring" pid=1193 comm="io_uring" requested="override_creds" denied="override_creds" tcontext="/root/apparmor/tests/regression/apparmor/io_uring"

View File

@ -0,0 +1,14 @@
START
File: testcase_io_uring_02.in
Event type: AA_RECORD_DENIED
Audit ID: 1680266735.147:63
Operation: uring_override
Mask: override_creds
Denied Mask: override_creds
Profile: /root/apparmor/tests/regression/apparmor/io_uring
Peer profile: /root/apparmor/tests/regression/apparmor/io_uring
Command: io_uring
PID: 1193
Class: io_uring
Epoch: 1680266735
Audit subid: 63

View File

@ -0,0 +1,4 @@
/root/apparmor/tests/regression/apparmor/io_uring {
io_uring override_creds label=/root/apparmor/tests/regression/apparmor/io_uring,
}

View File

@ -102,11 +102,11 @@ SRCS = parser_common.c parser_include.c parser_interface.c parser_lex.c \
parser_alias.c common_optarg.c lib.c network.c \
mount.cc dbus.cc profile.cc rule.cc signal.cc ptrace.cc \
af_rule.cc af_unix.cc policy_cache.c default_features.c userns.cc \
mqueue.cc
mqueue.cc io_uring.cc
STATIC_HDRS = af_rule.h af_unix.h capability.h common_optarg.h dbus.h \
file_cache.h immunix.h lib.h mount.h network.h parser.h \
parser_include.h parser_version.h policy_cache.h policydb.h \
profile.h ptrace.h rule.h signal.h userns.h mqueue.h
profile.h ptrace.h rule.h signal.h userns.h mqueue.h io_uring.h
SPECIAL_HDRS = parser_yacc.h unit_test.h base_cap_names.h
GENERATED_HDRS = af_names.h generated_af_names.h \
@ -318,6 +318,9 @@ userns.o: userns.cc $(HDRS)
mqueue.o: mqueue.cc $(HDRS)
$(CXX) $(EXTRA_CFLAGS) -c -o $@ $<
io_uring.o: io_uring.cc $(HDRS)
$(CXX) $(EXTRA_CFLAGS) -c -o $@ $<
parser_version.h: Makefile
@echo \#define PARSER_VERSION \"$(VERSION)\" > .ver
@mv -f .ver $@

141
parser/io_uring.cc Normal file
View File

@ -0,0 +1,141 @@
/*
* Copyright (c) 2023
* Canonical Ltd. (All rights reserved)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
* License published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, contact or Canonical Ltd.
*/
#include "parser.h"
#include "profile.h"
#include "io_uring.h"
#include <iomanip>
#include <string>
#include <iostream>
#include <sstream>
void io_uring_rule::move_conditionals(struct cond_entry *conds)
{
struct cond_entry *cond_ent;
list_for_each(conds, cond_ent) {
/* disallow keyword 'in' (list) */
if (!cond_ent->eq)
yyerror("keyword \"in\" is not allowed in io_uring rules\n");
if (list_len(cond_ent->vals) > 1)
yyerror("io_uring conditional \"%s\" only supports a single value\n",
cond_ent->name);
if (strcmp(cond_ent->name, "label") == 0) {
move_conditional_value("io_uring", &label, cond_ent);
} else {
yyerror("invalid io_uring conditional \"%s\"\n",
cond_ent->name);
}
}
}
io_uring_rule::io_uring_rule(perms_t perms_p, struct cond_entry *conds, struct cond_entry *ring_conds):
perms_rule_t(AA_CLASS_IO_URING), label(NULL)
{
if (perms_p) {
if (perms_p & ~AA_VALID_IO_URING_PERMS) {
yyerror("perms contains invalid permissions for io_uring\n");
}
perms = perms_p;
} else {
/* default to all perms */
perms = AA_VALID_IO_URING_PERMS;
}
move_conditionals(conds);
move_conditionals(ring_conds);
free_cond_list(conds);
free_cond_list(ring_conds);
}
ostream &io_uring_rule::dump(ostream &os)
{
class_rule_t::dump(os);
if (perms != AA_VALID_IO_URING_PERMS) {
os << " ( ";
if (perms & AA_IO_URING_OVERRIDE_CREDS)
os << "override_creds ";
if (perms & AA_IO_URING_SQPOLL)
os << " sqpoll ";
os << ")";
}
if (label)
os << " label=" << label;
os << ",\n";
return os;
}
int io_uring_rule::expand_variables(void)
{
return 0;
}
void io_uring_rule::warn_once(const char *name)
{
rule_t::warn_once(name, "io_uring rules not enforced");
}
int io_uring_rule::gen_policy_re(Profile &prof)
{
std::ostringstream buffer;
std::string buf, labelbuf;
if (!features_supports_io_uring) {
warn_once(prof.name);
return RULE_NOT_SUPPORTED;
}
buffer << "\\x" << std::setfill('0') << std::setw(2) << std::hex << AA_CLASS_IO_URING;
buf = buffer.str();
if (label) {
if (!convert_entry(labelbuf, label))
goto fail;
buffer << labelbuf;
} else {
buffer << default_match_pattern;
}
if (perms & AA_VALID_IO_URING_PERMS) {
if (!prof.policy.rules->add_rule(buf.c_str(), rule_mode == RULE_DENY, perms,
audit == AUDIT_FORCE ? perms : 0,
dfaflags))
goto fail;
if (perms & AA_IO_URING_OVERRIDE_CREDS) {
buf = buffer.str(); /* update buf to have label */
if (!prof.policy.rules->add_rule(buf.c_str(), rule_mode == RULE_DENY,
perms, audit == AUDIT_FORCE ? perms : 0,
dfaflags))
goto fail;
}
}
return RULE_OK;
fail:
return RULE_ERROR;
}

56
parser/io_uring.h Normal file
View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2023
* Canonical Ltd. (All rights reserved)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
* License published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, contact or Canonical Ltd.
*/
#ifndef __AA_IO_URING_H
#define __AA_IO_URING_H
#include "parser.h"
#define AA_IO_URING_OVERRIDE_CREDS AA_MAY_APPEND
#define AA_IO_URING_SQPOLL AA_MAY_CREATE
#define AA_VALID_IO_URING_PERMS (AA_IO_URING_OVERRIDE_CREDS | \
AA_IO_URING_SQPOLL)
class io_uring_rule: public perms_rule_t {
void move_conditionals(struct cond_entry *conds);
public:
char *label;
io_uring_rule(perms_t perms, struct cond_entry *conds, struct cond_entry *ring_conds);
virtual ~io_uring_rule()
{
free(label);
};
virtual bool valid_prefix(const prefixes &p, const char *&error) {
if (p.owner) {
error = _("owner prefix not allowed on io_uring rules");
return false;
}
return true;
};
virtual ostream &dump(ostream &os);
virtual int expand_variables(void);
virtual int gen_policy_re(Profile &prof);
protected:
virtual void warn_once(const char *name) override;
};
#endif /* __AA_IO_URING_H */

View File

@ -353,6 +353,7 @@ extern int features_supports_domain_xattr;
extern int features_supports_userns;
extern int features_supports_posix_mqueue;
extern int features_supports_sysv_mqueue;
extern int features_supports_io_uring;
extern int kernel_supports_oob;
extern int conf_verbose;
extern int conf_quiet;

View File

@ -81,6 +81,7 @@ int features_supports_domain_xattr = 0; /* x attachment cond */
int features_supports_userns = 0; /* kernel supports user namespace */
int features_supports_posix_mqueue = 0; /* kernel supports mqueue rules */
int features_supports_sysv_mqueue = 0; /* kernel supports mqueue rules */
int features_supports_io_uring = 0; /* kernel supports io_uring rules */
int kernel_supports_oob = 0; /* out of band transitions */
int conf_verbose = 0;
int conf_quiet = 0;

View File

@ -329,6 +329,7 @@ GT >
%x ABI_MODE
%x USERNS_MODE
%x MQUEUE_MODE
%x IOURING_MODE
%%
@ -341,7 +342,7 @@ GT >
}
%}
<INITIAL,SUB_ID_WS,INCLUDE,INCLUDE_EXISTS,LIST_VAL_MODE,EXTCOND_MODE,LIST_COND_VAL,LIST_COND_PAREN_VAL,LIST_COND_MODE,EXTCONDLIST_MODE,ASSIGN_MODE,NETWORK_MODE,CHANGE_PROFILE_MODE,RLIMIT_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,ABI_MODE,USERNS_MODE,MQUEUE_MODE>{
<INITIAL,SUB_ID_WS,INCLUDE,INCLUDE_EXISTS,LIST_VAL_MODE,EXTCOND_MODE,LIST_COND_VAL,LIST_COND_PAREN_VAL,LIST_COND_MODE,EXTCONDLIST_MODE,ASSIGN_MODE,NETWORK_MODE,CHANGE_PROFILE_MODE,RLIMIT_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,ABI_MODE,USERNS_MODE,MQUEUE_MODE,IOURING_MODE>{
{WS}+ { DUMP_PREPROCESS; /* Ignoring whitespace */ }
}
@ -615,7 +616,12 @@ GT >
{ARROW} { RETURN_TOKEN(TOK_ARROW); }
}
<MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,MQUEUE_MODE>{
<IOURING_MODE>{
override_creds { RETURN_TOKEN(TOK_OVERRIDE_CREDS); }
sqpoll { RETURN_TOKEN(TOK_SQPOLL); }
}
<MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,MQUEUE_MODE,IOURING_MODE>{
({IDS_NOEQ}|{LABEL}|{QUOTED_ID}) {
yylval.id = processid(yytext, yyleng);
RETURN_TOKEN(TOK_ID);
@ -749,7 +755,7 @@ include/{WS} {
PUSH_AND_RETURN(state, token);
}
<INITIAL,NETWORK_MODE,RLIMIT_MODE,CHANGE_PROFILE_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,ABI_MODE,USERNS_MODE,MQUEUE_MODE>{
<INITIAL,NETWORK_MODE,RLIMIT_MODE,CHANGE_PROFILE_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,ABI_MODE,USERNS_MODE,MQUEUE_MODE,IOURING_MODE>{
{END_OF_RULE} {
if (YY_START != INITIAL)
POP_NODUMP();
@ -757,14 +763,14 @@ include/{WS} {
}
}
<INITIAL,SUB_ID_WS,INCLUDE,INCLUDE_EXISTS,LIST_VAL_MODE,EXTCOND_MODE,LIST_COND_VAL,LIST_COND_PAREN_VAL,LIST_COND_MODE,EXTCONDLIST_MODE,NETWORK_MODE,CHANGE_PROFILE_MODE,RLIMIT_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,ABI_MODE,USERNS_MODE,MQUEUE_MODE>{
<INITIAL,SUB_ID_WS,INCLUDE,INCLUDE_EXISTS,LIST_VAL_MODE,EXTCOND_MODE,LIST_COND_VAL,LIST_COND_PAREN_VAL,LIST_COND_MODE,EXTCONDLIST_MODE,NETWORK_MODE,CHANGE_PROFILE_MODE,RLIMIT_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,ABI_MODE,USERNS_MODE,MQUEUE_MODE,IOURING_MODE>{
\r?\n {
DUMP_PREPROCESS;
current_lineno++;
}
}
<INITIAL,SUB_ID,SUB_ID_WS,SUB_VALUE,LIST_VAL_MODE,EXTCOND_MODE,LIST_COND_VAL,LIST_COND_PAREN_VAL,LIST_COND_MODE,EXTCONDLIST_MODE,ASSIGN_MODE,NETWORK_MODE,CHANGE_PROFILE_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,RLIMIT_MODE,INCLUDE,INCLUDE_EXISTS,ABI_MODE,USERNS_MODE,MQUEUE_MODE>{
<INITIAL,SUB_ID,SUB_ID_WS,SUB_VALUE,LIST_VAL_MODE,EXTCOND_MODE,LIST_COND_VAL,LIST_COND_PAREN_VAL,LIST_COND_MODE,EXTCONDLIST_MODE,ASSIGN_MODE,NETWORK_MODE,CHANGE_PROFILE_MODE,MOUNT_MODE,DBUS_MODE,SIGNAL_MODE,PTRACE_MODE,UNIX_MODE,RLIMIT_MODE,INCLUDE,INCLUDE_EXISTS,ABI_MODE,USERNS_MODE,MQUEUE_MODE,IOURING_MODE>{
(.|\n) {
DUMP_PREPROCESS;
/* Something we didn't expect */
@ -801,4 +807,5 @@ unordered_map<int, string> state_names = {
STATE_TABLE_ENT(ABI_MODE),
STATE_TABLE_ENT(USERNS_MODE),
STATE_TABLE_ENT(MQUEUE_MODE),
STATE_TABLE_ENT(IOURING_MODE),
};

View File

@ -950,6 +950,9 @@ void set_supported_features()
features_supports_sysv_mqueue = features_intersect(kernel_features,
policy_features,
"ipc/sysv_mqueue");
features_supports_io_uring = features_intersect(kernel_features,
policy_features,
"io_uring");
}
static bool do_print_cache_dir(aa_features *features, int dirfd, const char *path)

View File

@ -124,6 +124,9 @@ static struct keyword_table keyword_table[] = {
{"mqueue", TOK_MQUEUE},
{"delete", TOK_DELETE},
{"open", TOK_OPEN},
{"io_uring", TOK_IO_URING},
{"override_creds", TOK_OVERRIDE_CREDS},
{"sqpoll", TOK_SQPOLL},
/* terminate */
{NULL, 0}

View File

@ -947,6 +947,7 @@ static const char *mediates_net_unix = CLASS_SUB_STR(AA_CLASS_NET, AF_UNIX);
static const char *mediates_ns = CLASS_STR(AA_CLASS_NS);
static const char *mediates_posix_mqueue = CLASS_STR(AA_CLASS_POSIX_MQUEUE);
static const char *mediates_sysv_mqueue = CLASS_STR(AA_CLASS_SYSV_MQUEUE);
static const char *mediates_io_uring = CLASS_STR(AA_CLASS_IO_URING);
int process_profile_policydb(Profile *prof)
{
@ -998,6 +999,9 @@ int process_profile_policydb(Profile *prof)
if (features_supports_sysv_mqueue &&
!prof->policy.rules->add_rule(mediates_sysv_mqueue, 0, AA_MAY_READ, 0, dfaflags))
goto out;
if (features_supports_io_uring &&
!prof->policy.rules->add_rule(mediates_io_uring, 0, AA_MAY_READ, 0, dfaflags))
goto out;
if (prof->policy.rules->rule_count > 0) {
int xmatch_len = 0;

View File

@ -146,6 +146,9 @@ bool check_x_qualifier(struct cod_entry *entry, const char *&errror);
%token TOK_USERNS
%token TOK_MQUEUE
%token TOK_DELETE
%token TOK_IO_URING
%token TOK_OVERRIDE_CREDS
%token TOK_SQPOLL
/* rlimits */
%token TOK_RLIMIT
@ -183,6 +186,7 @@ bool check_x_qualifier(struct cod_entry *entry, const char *&errror);
#include "af_unix.h"
#include "userns.h"
#include "mqueue.h"
#include "io_uring.h"
}
%union {
@ -201,6 +205,7 @@ bool check_x_qualifier(struct cod_entry *entry, const char *&errror);
unix_rule *unix_entry;
userns_rule *userns_entry;
mqueue_rule *mqueue_entry;
io_uring_rule *io_uring_entry;
prefix_rule_t *prefix_entry;
flagvals flags;
@ -293,6 +298,10 @@ bool check_x_qualifier(struct cod_entry *entry, const char *&errror);
%type <fperms> mqueue_perms
%type <fperms> opt_mqueue_perm
%type <mqueue_entry> mqueue_rule
%type <fperms> io_uring_perm
%type <fperms> io_uring_perms
%type <fperms> opt_io_uring_perm
%type <io_uring_entry> io_uring_rule
%%
@ -783,6 +792,7 @@ prefix_rule : mnt_rule { $$ = $1; }
| unix_rule { $$ = $1; }
| userns_rule { $$ = $1; }
| mqueue_rule { $$ = $1; }
| io_uring_rule { $$ = $1; }
rules: rules opt_prefix prefix_rule
{
@ -1558,6 +1568,38 @@ mqueue_rule: TOK_MQUEUE opt_mqueue_perm opt_conds TOK_END_OF_RULE
$$ = ent;
}
io_uring_perm: TOK_VALUE
{
if (strcmp($1, "override_creds") == 0)
$$ = AA_IO_URING_OVERRIDE_CREDS;
else if (strcmp($1, "sqpoll") == 0)
$$ = AA_IO_URING_SQPOLL;
else
$$ = 0;
if ($1)
free($1);
}
| TOK_OVERRIDE_CREDS { $$ = AA_IO_URING_OVERRIDE_CREDS; }
| TOK_SQPOLL { $$ = AA_IO_URING_SQPOLL; }
io_uring_perms: { /* nothing */ $$ = 0; }
| io_uring_perms io_uring_perm { $$ = $1 | $2; }
| io_uring_perms TOK_COMMA io_uring_perm { $$ = $1 | $3; }
opt_io_uring_perm: { /* nothing */ $$ = 0; }
| io_uring_perm { $$ = $1; }
| TOK_OPENPAREN io_uring_perms TOK_CLOSEPAREN { $$ = $2; }
io_uring_rule: TOK_IO_URING opt_io_uring_perm opt_conds opt_cond_list TOK_END_OF_RULE
{
io_uring_rule *ent;
ent = new io_uring_rule($2, $3, $4.list);
if (!ent)
yyerror(_("Memory allocation error."));
$$ = ent;
}
hat_start: TOK_CARET {}
| TOK_HAT {}

View File

@ -0,0 +1,8 @@
#
#=Description basic io_uring invalid access
#=EXRESULT FAIL
#
/usr/bin/io_uring-test {
io_uring read,
}

View File

@ -0,0 +1,8 @@
#
#=Description io_uring invalid label
#=EXRESULT FAIL
#
/usr/bin/io_uring-test {
io_uring label=,
}

View File

@ -0,0 +1,8 @@
#
#=Description basic io_uring bad conditionals
#=EXRESULT FAIL
#
/usr/bin/io_uring-test {
io_uring peer=foo,
}

View File

@ -0,0 +1,8 @@
#
#=Description io_uring valid perm bad conditionals
#=EXRESULT FAIL
#
/usr/bin/io_uring-test {
io_uring override_creds peer=foo,
}

View File

@ -0,0 +1,6 @@
#
#=Description io_uring rule outside of a profile
#=EXRESULT FAIL
#
io_uring,

View File

@ -0,0 +1,8 @@
#
#=Description basic io_uring all rule
#=EXRESULT PASS
#
/usr/bin/io_uring-rule {
io_uring,
}

View File

@ -0,0 +1,8 @@
#
#=Description basic deny io_uring all rule
#=EXRESULT PASS
#
/usr/bin/io_uring-rule {
deny io_uring,
}

View File

@ -0,0 +1,8 @@
#
#=Description basic allow io_uring all rule
#=EXRESULT PASS
#
/usr/bin/io_uring-rule {
allow io_uring,
}

View File

@ -0,0 +1,8 @@
#
#=Description basic audit io_uring all rule
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
audit io_uring,
}

View File

@ -0,0 +1,8 @@
#
#=Description basic io_uring sqpoll rule
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
io_uring sqpoll,
}

View File

@ -0,0 +1,8 @@
#
#=Description basic io_uring override_creds rule
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
io_uring override_creds,
}

View File

@ -0,0 +1,10 @@
#
#=Description io_uring override_creds rule with label
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
io_uring override_creds label=foo,
io_uring override_creds label=/path/to/foo,
io_uring label=/bar,
}

View File

@ -0,0 +1,18 @@
#
#=Description basic io_uring mixed rules
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
io_uring,
io_uring sqpoll,
io_uring override_creds,
io_uring override_creds label=/bar,
}
/usr/bin/io_uring-test2 {
io_uring override_creds label=/bar,
io_uring override_creds,
io_uring sqpoll,
io_uring,
}

View File

@ -0,0 +1,11 @@
#
#=Description io_uring combined rules
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
io_uring,
io_uring (sqpoll, override_creds),
io_uring (sqpoll override_creds),
io_uring (sqpoll, override_creds) label=foo,
io_uring (sqpoll override_creds) label = /bar,
}

View File

@ -0,0 +1,26 @@
#
#=Description basic io_uring mixed access w/modifiers rules
#=EXRESULT PASS
#
/usr/bin/io_uring-test {
deny io_uring,
audit io_uring sqpoll,
}
/usr/bin/io_uring-test2 {
allow io_uring override_creds,
audit io_uring,
}
/usr/bin/io_uring-test3 {
audit deny io_uring override_creds,
}
/usr/bin/io_uring-test4 {
audit allow io_uring sqpoll,
deny io_uring sqpoll,
}

View File

@ -104,6 +104,7 @@ SRC=access.c \
fd_inheritance.c \
fd_inheritor.c \
fork.c \
io_uring.c \
link.c \
link_subset.c \
mmap.c \
@ -354,6 +355,9 @@ userns_setns: userns_setns.c userns.h
mount: mount.c
${CC} ${CFLAGS} -std=gnu99 ${LDFLAGS} $^ -o $@ ${LDLIBS}
io_uring: io_uring.c
${CC} ${CFLAGS} ${LDFLAGS} $< -o $@ ${LDLIBS} -luring
build-dep:
@if [ `whoami` = "root" ] ;\
then \

View File

@ -0,0 +1,204 @@
/*
* Copyright (C) 2023 Canonical, Ltd.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
* License published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, contact Canonical Ltd.
*/
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <liburing.h>
#define DEFAULT_FILENAME "/tmp/io_uring_test"
#define DEFAULT_UID 1000
static int no_personality;
static int open_file(struct io_uring *ring, int cred_id, char *filename)
{
struct io_uring_cqe *cqe;
struct io_uring_sqe *sqe;
int ret, i, to_submit = 1;
sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "FAIL - could not get sqe.\n");
return 1;
}
io_uring_prep_openat(sqe, -1, filename, O_RDONLY, 0);
sqe->user_data = 1;
if (cred_id != -1)
sqe->personality = cred_id;
ret = io_uring_submit(ring);
if (ret != to_submit) {
fprintf(stderr, "FAIL - could not submit: %s\n", strerror(-ret));
goto err;
}
for (i = 0; i < to_submit; i++) {
ret = io_uring_wait_cqe(ring, &cqe);
if (ret < 0) {
fprintf(stderr, "FAIL - wait cqe failed %s\n", strerror(-ret));
goto err;
}
ret = cqe->res;
io_uring_cqe_seen(ring, cqe);
}
err:
return ret;
}
static int test_personality(struct io_uring *ring, char *filename, uid_t uid)
{
int ret, cred_id;
ret = io_uring_register_personality(ring);
if (ret < 0) {
if (ret == -EINVAL) {
no_personality = 1;
goto out;
}
fprintf(stderr, "FAIL - could not register personality: %s\n", strerror(-ret));
goto err;
}
cred_id = ret;
/* create file only owner can open */
ret = open(filename, O_RDONLY | O_CREAT, 0600);
if (ret < 0) {
perror("open");
goto err;
}
close(ret);
/* verify we can open it */
ret = open_file(ring, -1, filename);
if (ret < 0) {
fprintf(stderr, "FAIL - root could not open file: %d\n", ret);
goto err;
}
if (seteuid(uid) < 0) {
fprintf(stdout, "FAIL - could not switch to uid %u\n", uid);
goto out;
}
/* verify we can't open it with current credentials */
ret = open_file(ring, -1, filename);
if (ret != -EACCES) {
fprintf(stderr, "FAIL - opened with regular credential: %d\n", ret);
goto err;
}
/* verify we can open with registered credentials */
ret = open_file(ring, cred_id, filename);
if (ret < 0) {
fprintf(stderr, "FAIL - could not open with registered credentials: %d\n", ret);
goto err;
}
close(ret);
if (seteuid(0))
perror("FAIL - seteuid");
ret = io_uring_unregister_personality(ring, cred_id);
if (ret) {
fprintf(stderr, "FAIL - could not unregister personality: %s\n",
strerror(-ret));
goto err;
}
out:
unlink(filename);
return 0;
err:
unlink(filename);
return 1;
}
static void usage(char *pname)
{
fprintf(stderr, "Usage: %s [options]\n", pname);
fprintf(stderr, "Options can be:\n");
fprintf(stderr, " -s create ring using IORING_SETUP_SQPOLL\n");
fprintf(stderr, " -o use io_uring personality to open a file\n");
fprintf(stderr, " -u specify UID for option -s (default is %d)\n", DEFAULT_UID);
fprintf(stderr, " -f specify file opened by option -s (default is %s)\n", DEFAULT_FILENAME);
exit(EXIT_FAILURE);
}
enum op {
SQPOLL,
OVERRIDE_CREDS,
INVALID_OP,
};
int main(int argc, char *argv[])
{
struct io_uring ring;
int opt, ret = 0, op = INVALID_OP;
char *filename = DEFAULT_FILENAME;
uid_t uid = DEFAULT_UID;
while ((opt = getopt(argc, argv, "sou:f:")) != -1) {
switch (opt) {
case 's': op = SQPOLL; break;
case 'o': op = OVERRIDE_CREDS; break;
case 'u': uid = atoi(optarg); break;
case 'f': filename = optarg; break;
default: usage(argv[0]);
}
}
if (op == INVALID_OP) {
printf("FAIL - operation not selected\n");
return 1;
}
if (op == SQPOLL) {
ret = io_uring_queue_init(8, &ring, IORING_SETUP_SQPOLL);
if (ret) {
fprintf(stderr, "FAIL - failed to create sqpoll ring: %s\n",
strerror(-ret));
return 1;
}
io_uring_queue_exit(&ring);
}
if (op == OVERRIDE_CREDS) {
ret = io_uring_queue_init(8, &ring, 0);
if (ret) {
fprintf(stderr, "FAIL - failed to create override_creds ring: %s\n",
strerror(-ret));
return 1;
}
ret = test_personality(&ring, filename, uid);
if (no_personality) {
/* personality was added in kernel 5.6 */
printf("Personalities not supported, skipping...\n");
} else if (ret) {
fprintf(stderr, "FAIL - override_creds failed\n");
return ret;
}
io_uring_queue_exit(&ring);
}
printf("PASS\n");
return 0;
}

View File

@ -0,0 +1,83 @@
#! /bin/bash
#Copyright (C) 2023 Canonical, Ltd.
#
#This program is free software; you can redistribute it and/or
#modify it under the terms of the GNU General Public License as
#published by the Free Software Foundation, version 2 of the
#License.
#=NAME io_uring
#=DESCRIPTION
# This test verifies if mediation of io_uring is working
#=END
pwd=`dirname $0`
pwd=`cd $pwd ; /bin/pwd`
bin=$pwd
. $bin/prologue.inc
requires_kernel_features io_uring
requires_parser_support "io_uring,"
settest io_uring
uid=1000
file=$tmpdir/io_uring_test
label=$bin/io_uring
required_perms="$file:rw cap:setuid cap:ipc_lock"
do_test()
{
local desc="IO_URING ($1)"
shift
runchecktest "$desc" "$@"
}
do_tests()
{
prefix=$1
expect_sqpoll=$2
expect_override_creds=$3
do_test "$prefix - test sqpoll" $expect_sqpoll -s
do_test "$prefix - test override_creds" $expect_override_creds -o -u $uid -f $file
}
# make sure it works unconfined
do_tests "unconfined" pass pass
genprofile $required_perms
do_tests "no perms" fail fail
genprofile $required_perms "qual=deny:io_uring"
do_tests "deny perms" fail fail
genprofile $required_perms "io_uring"
do_tests "generic perms" pass pass
genprofile $required_perms "io_uring:sqpoll"
do_tests "only sqpoll perm" pass fail
genprofile $required_perms "io_uring:override_creds"
do_tests "only override_creds perm" fail pass
genprofile $required_perms "io_uring:(sqpoll, override_creds)"
do_tests "explicit perms" pass pass
genprofile $required_perms "io_uring:sqpoll:label=$label"
do_tests "specify label without override_creds perm" pass fail
genprofile $required_perms "io_uring:label=$label"
do_tests "all perms specify label" pass pass
genprofile $required_perms "io_uring:(sqpoll, override_creds):label=$label"
do_tests "specify perms specify label" pass pass
genprofile $required_perms "io_uring:override_creds:label=$label"
do_tests "specify label" fail pass
genprofile $required_perms "io_uring:override_creds:label=/foo"
do_tests "invalid label" fail fail

View File

@ -443,6 +443,22 @@ sub gen_mqueue($@) {
}
}
sub gen_io_uring($@) {
my ($rule, $qualifier) = @_;
my @rules = split (/:/, $rule);
if (@rules == 2) {
if ($rules[1] =~ /^ALL$/) {
push (@{$output_rules{$hat}}, " ${qualifier}io_uring,\n");
} else {
push (@{$output_rules{$hat}}, " ${qualifier}io_uring $rules[1],\n");
}
} elsif (@rules == 3) {
push (@{$output_rules{$hat}}, " ${qualifier}io_uring $rules[1] $rules[2],\n");
} else {
(!$nowarn) && print STDERR "Warning: invalid io_uring description '$rule', ignored\n";
}
}
sub emit_flags($) {
my $hat = shift;
@ -514,6 +530,8 @@ sub gen_from_args() {
gen_path($rule);
} elsif ($rule =~ /^mqueue:/) {
gen_mqueue($rule, $qualifier);
} elsif ($rule =~ /^io_uring:/) {
gen_io_uring($rule, $qualifier);
} else {
gen_file($rule, $qualifier);
}

View File

@ -52,6 +52,7 @@ from apparmor.rule.ptrace import PtraceRule
from apparmor.rule.signal import SignalRule
from apparmor.rule.userns import UserNamespaceRule
from apparmor.rule.mqueue import MessageQueueRule
from apparmor.rule.io_uring import IOUringRule
from apparmor.translations import init_translation
_ = init_translation()
@ -1742,6 +1743,15 @@ def collapse_log(hashlog, ignore_null_profiles=True):
if not hat_exists or not is_known_rule(aa[profile][hat], 'mqueue', mqueue_event):
log_dict[aamode][full_profile]['mqueue'].add(mqueue_event)
io_uring = hashlog[aamode][full_profile]['io_uring']
for access in io_uring.keys():
for label in io_uring[access]:
if not label:
label = IOUringRule.ALL
io_uring_event = IOUringRule(access, label, log_event=True)
if not hat_exists or not is_known_rule(aa[profile][hat], 'io_uring', io_uring_event):
log_dict[aamode][full_profile]['io_uring'].add(io_uring_event)
return log_dict
@ -2119,6 +2129,7 @@ def match_line_against_rule_classes(line, profile, file, lineno, in_preamble):
'signal',
'userns',
'mqueue',
'io_uring',
):
if rule_name in ruletypes:

View File

@ -59,6 +59,7 @@ class ReadLog:
'signal': hasher(),
'userns': hasher(),
'mqueue': hasher(),
'io_uring': hasher(),
}
def prefetch_next_log_entry(self):
@ -122,6 +123,9 @@ class ReadLog:
ev['interface'] = event.dbus_interface
ev['member'] = event.dbus_member
elif ev['operation'] and ev['operation'].startswith('uring_'):
ev['peer_profile'] = event.peer_profile
LibAppArmor.free_record(event)
if not ev['time']:
@ -196,6 +200,10 @@ class ReadLog:
self.hashlog[aamode][full_profile]['mqueue'][e['denied_mask']][mqueue_type][e['name']] = True
return
elif e['class'] and e['class'] == 'io_uring':
self.hashlog[aamode][full_profile]['io_uring'][e['denied_mask']][e['peer_profile']] = True
return
elif self.op_type(e) == 'file':
# Map c (create) and d (delete) to w (logging is more detailed than the profile language)
dmask = e['denied_mask']

View File

@ -29,6 +29,7 @@ from apparmor.rule.rlimit import RlimitRule, RlimitRuleset
from apparmor.rule.signal import SignalRule, SignalRuleset
from apparmor.rule.userns import UserNamespaceRule, UserNamespaceRuleset
from apparmor.rule.mqueue import MessageQueueRule, MessageQueueRuleset
from apparmor.rule.io_uring import IOUringRule, IOUringRuleset
from apparmor.translations import init_translation
_ = init_translation()
@ -46,6 +47,7 @@ ruletypes = {
'signal': {'rule': SignalRule, 'ruleset': SignalRuleset},
'userns': {'rule': UserNamespaceRule, 'ruleset': UserNamespaceRuleset},
'mqueue': {'rule': MessageQueueRule, 'ruleset': MessageQueueRuleset},
'io_uring': {'rule': IOUringRule, 'ruleset': IOUringRuleset},
}
@ -198,6 +200,7 @@ class ProfileStorage:
'file',
'change_profile',
'userns',
'io_uring',
]
data = []

View File

@ -53,6 +53,7 @@ RE_PROFILE_PIVOT_ROOT = re.compile(RE_AUDIT_DENY + r'(pivot_root\s*,|pivot_root\
RE_PROFILE_UNIX = re.compile(RE_AUDIT_DENY + r'(unix\s*,|unix\s+[^#]*\s*,)' + RE_EOL)
RE_PROFILE_USERNS = re.compile(RE_AUDIT_DENY + r'(userns\s*,|userns(?P<details>\s+[^#]*)\s*,)' + RE_EOL)
RE_PROFILE_MQUEUE = re.compile(RE_AUDIT_DENY + r'(mqueue\s*,|mqueue(?P<details>\s+[^#]*)\s*,)' + RE_EOL)
RE_PROFILE_IO_URING = re.compile(RE_AUDIT_DENY + r'(io_uring\s*,|io_uring(?P<details>\s+[^#]*)\s*,)' + RE_EOL)
# match anything that's not " or #, or matching quotes with anything except quotes inside
__re_no_or_quoted_hash = '([^#"]|"[^"]*")*'

View File

@ -0,0 +1,164 @@
# ----------------------------------------------------------------------
# Copyright (C) 2023 Canonical, Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# ----------------------------------------------------------------------
import re
from apparmor.regex import RE_PROFILE_IO_URING, RE_PROFILE_NAME
from apparmor.common import AppArmorBug, AppArmorException
from apparmor.rule import BaseRule, BaseRuleset, check_and_split_list, logprof_value_or_all, parse_modifiers, quote_if_needed
# setup module translations
from apparmor.translations import init_translation
_ = init_translation()
access_keywords = ['sqpoll', 'override_creds']
joint_access_keyword = r'\s*(' + '|'.join(access_keywords) + r')\s*'
RE_ACCESS_KEYWORDS = (joint_access_keyword + # one of the access_keyword or
'|' + # or
r'\(' + joint_access_keyword + '(' + r'(\s|,)+' + joint_access_keyword + ')*' + r'\)' # one or more access_keyword in (...)
)
RE_IO_URING_DETAILS = re.compile(
r'^' +
r'(\s+(?P<access>' + RE_ACCESS_KEYWORDS + r'))?' + # optional access keyword(s)
r'(\s+(label\s*=\s*' + RE_PROFILE_NAME % 'label' + r'))?' + # optional label
r'\s*$')
class IOUringRule(BaseRule):
'''Class to handle and store a single io_uring rule'''
# Nothing external should reference this class, all external users
# should reference the class field IOUringRule.ALL
class __IOUringAll(object):
pass
ALL = __IOUringAll
rule_name = 'io_uring'
_match_re = RE_PROFILE_IO_URING
def __init__(self, access, label, audit=False, deny=False,
allow_keyword=False, comment='', log_event=None):
super().__init__(audit=audit, deny=deny,
allow_keyword=allow_keyword,
comment=comment,
log_event=log_event)
self.access, self.all_access, unknown_items = check_and_split_list(access, access_keywords, self.ALL, type(self).__name__, 'access')
if unknown_items:
raise AppArmorException(_('Passed unknown access keyword to %s: %s') % (type(self).__name__, ' '.join(unknown_items)))
self.label, self.all_labels = self._aare_or_all(label, 'label', is_path=False, log_event=log_event)
@classmethod
def _create_instance(cls, raw_rule, matches):
'''parse raw_rule and return instance of this class'''
audit, deny, allow_keyword, comment = parse_modifiers(matches)
rule_details = ''
if matches.group('details'):
rule_details = matches.group('details')
if rule_details:
details = RE_IO_URING_DETAILS.search(rule_details)
if not details:
raise AppArmorException(_("Invalid or unknown keywords in 'io_uring %s" % rule_details))
if details.group('access'):
access = details.group('access')
if access.startswith('(') and access.endswith(')'):
access = access[1:-1]
access = access.replace(',', ' ').split() # split by ',' or whitespace
else:
access = cls.ALL
if details.group('label'):
label = details.group('label')
else:
label = cls.ALL
else:
access = cls.ALL
label = cls.ALL
return cls(access, label, audit=audit, deny=deny,
allow_keyword=allow_keyword, comment=comment)
def get_clean(self, depth=0):
'''return rule (in clean/default formatting)'''
space = ' ' * depth
if self.all_access:
access = ''
elif len(self.access) == 1:
access = ' %s' % ' '.join(self.access)
elif self.access:
access = ' (%s)' % ' '.join(sorted(self.access))
else:
raise AppArmorBug('Empty access in io_uring rule')
if self.all_labels:
label = ''
elif self.label:
label = ' label=%s' % quote_if_needed(self.label.regex)
else:
raise AppArmorBug('Empty label in io_uring rule')
return('%s%sio_uring%s%s,%s' % (space, self.modifiers_str(), access, label, self.comment))
def _is_covered_localvars(self, other_rule):
'''check if other_rule is covered by this rule object'''
if not self._is_covered_list(self.access, self.all_access, other_rule.access, other_rule.all_access, 'access'):
return False
if not self._is_covered_aare(self.label, self.all_labels, other_rule.label, other_rule.all_labels, 'label'):
return False
# still here? -> then it is covered
return True
def _is_equal_localvars(self, rule_obj, strict):
'''compare if rule-specific variables are equal'''
if (self.access != rule_obj.access or
self.all_access != rule_obj.all_access):
return False
if not self._is_equal_aare(self.label, self.all_labels, rule_obj.label, rule_obj.all_labels, 'label'):
return False
return True
def _logprof_header_localvars(self):
access = logprof_value_or_all(self.access, self.all_access)
label = logprof_value_or_all(self.label, self.all_labels)
return (
_('Access mode'), access,
_('Label'), label,
)
class IOUringRuleset(BaseRuleset):
'''Class to handle and store a collection of io_uring rules'''
def get_glob(self, path_or_rule):
'''Return the next possible glob. For io_uring rules, that means removing access and label'''
# XXX only remove one part, not all
return 'io_uring,'

194
utils/test/test-io_uring.py Normal file
View File

@ -0,0 +1,194 @@
#!/usr/bin/python3
# ----------------------------------------------------------------------
# Copyright (C) 2023 Canonical, Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# ----------------------------------------------------------------------
import unittest
from collections import namedtuple
from common_test import AATest, setup_all_loops
from apparmor.rule.io_uring import IOUringRule, IOUringRuleset
from apparmor.common import AppArmorException, AppArmorBug
from apparmor.translations import init_translation
_ = init_translation()
class IOUringTestParse(AATest):
tests = (
# access label audit deny allow comment
('io_uring,', IOUringRule(IOUringRule.ALL, IOUringRule.ALL, False, False, False, '')),
('io_uring sqpoll,', IOUringRule(('sqpoll'), IOUringRule.ALL, False, False, False, '')),
('io_uring override_creds,', IOUringRule(('override_creds'), IOUringRule.ALL, False, False, False, '')),
('io_uring override_creds label=/foo,', IOUringRule(('override_creds'), '/foo', False, False, False, '')),
('io_uring sqpoll label=bar,', IOUringRule(('sqpoll'), 'bar', False, False, False, '')),
('io_uring (override_creds, sqpoll) label=/foo,', IOUringRule(('override_creds', 'sqpoll'), '/foo', False, False, False, '')),
('audit io_uring sqpoll,', IOUringRule(('sqpoll'), IOUringRule.ALL, True, False, False, '')),
('deny io_uring,', IOUringRule(IOUringRule.ALL, IOUringRule.ALL, False, True, False, '')),
('deny io_uring (sqpoll, override_creds),', IOUringRule(('sqpoll', 'override_creds'), IOUringRule.ALL, False, True, False, '')),
('audit allow io_uring,', IOUringRule(IOUringRule.ALL, IOUringRule.ALL, True, False, True, '')),
('io_uring override_creds, # cmt', IOUringRule(('override_creds'), IOUringRule.ALL, False, False, False, ' # cmt')),
)
def _run_test(self, rawrule, expected):
self.assertTrue(IOUringRule.match(rawrule))
obj = IOUringRule.create_instance(rawrule)
expected.raw_rule = rawrule.strip()
self.assertTrue(obj.is_equal(expected, True))
class IOUringTestParseInvalid(AATest):
tests = (
('io_uring invalidaccess,', AppArmorException),
('io_uring label=,', AppArmorException),
('io_uring invalidaccess label=foo,', AppArmorException),
('io_uring sqpoll label=,', AppArmorException),
)
def _run_test(self, rawrule, expected):
self.assertTrue(IOUringRule.match(rawrule)) # the above invalid rules still match the main regex!
with self.assertRaises(expected):
IOUringRule.create_instance(rawrule)
def test_parse_fail(self):
with self.assertRaises(AppArmorException):
IOUringRule.create_instance('foo,')
def test_diff_non_iouringrule(self):
exp = namedtuple('exp', ('audit', 'deny'))
obj = IOUringRule(('sqpoll'), IOUringRule.ALL)
with self.assertRaises(AppArmorBug):
obj.is_equal(exp(False, False), False)
def test_diff_access(self):
obj1 = IOUringRule(IOUringRule.ALL, IOUringRule.ALL)
obj2 = IOUringRule(('sqpoll'), IOUringRule.ALL)
self.assertFalse(obj1.is_equal(obj2, False))
def test_diff_label(self):
obj1 = IOUringRule(IOUringRule.ALL, 'foo')
obj2 = IOUringRule(IOUringRule.ALL, '/bar')
self.assertFalse(obj1.is_equal(obj2, False))
class InvalidIOUringInit(AATest):
tests = (
# init params expected exception
(('', 'label'), AppArmorBug), # empty access
((' ', 'label'), AppArmorBug), # whitespace access
(('xyxy', 'label'), AppArmorException), # invalid access
((dict(), 'label'), AppArmorBug), # wrong type for access
((None, 'label'), AppArmorBug), # wrong type for access
(('sqpoll', ''), AppArmorBug), # empty label
(('sqpoll', ' '), AppArmorBug), # whitespace label
(('sqpoll', dict()), AppArmorBug), # wrong type for label
(('sqpoll', None), AppArmorBug), # wrong type for label
)
def _run_test(self, params, expected):
with self.assertRaises(expected):
IOUringRule(*params)
def test_missing_params1(self):
with self.assertRaises(TypeError):
IOUringRule()
def test_missing_params2(self):
with self.assertRaises(TypeError):
IOUringRule('override_creds')
class WriteIOUringTestAATest(AATest):
tests = (
# raw rule clean rule
(' io_uring , # foo ', 'io_uring, # foo'),
(' audit io_uring sqpoll,', 'audit io_uring sqpoll,'),
(' audit io_uring (override_creds ),', 'audit io_uring override_creds,'),
(' audit io_uring (sqpoll , override_creds ),', 'audit io_uring (override_creds sqpoll),'),
(' deny io_uring sqpoll label=bar,# foo bar', 'deny io_uring sqpoll label=bar, # foo bar'),
(' deny io_uring override_creds ,# foo bar', 'deny io_uring override_creds, # foo bar'),
(' allow io_uring label=tst ,# foo bar' , 'allow io_uring label=tst, # foo bar'),
('io_uring,', 'io_uring,'),
('io_uring (override_creds),', 'io_uring override_creds,'),
('io_uring (sqpoll),', 'io_uring sqpoll,'),
('io_uring (sqpoll override_creds),', 'io_uring (override_creds sqpoll),'),
('io_uring sqpoll label="tst",', 'io_uring sqpoll label="tst",'),
('io_uring (override_creds) label=bar,', 'io_uring override_creds label=bar,'),
('io_uring (sqpoll override_creds) label=/foo,', 'io_uring (override_creds sqpoll) label=/foo,'),
)
def _run_test(self, rawrule, expected):
self.assertTrue(IOUringRule.match(rawrule))
obj = IOUringRule.create_instance(rawrule)
clean = obj.get_clean()
raw = obj.get_raw()
self.assertEqual(expected.strip(), clean, 'unexpected clean rule')
self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule')
def test_write_manually(self):
obj = IOUringRule('sqpoll', IOUringRule.ALL, allow_keyword=True)
expected = ' allow io_uring sqpoll,'
self.assertEqual(expected, obj.get_clean(2), 'unexpected clean rule')
self.assertEqual(expected, obj.get_raw(2), 'unexpected raw rule')
def test_write_invalid_access(self):
obj = IOUringRule('sqpoll', IOUringRule.ALL)
obj.access = ''
with self.assertRaises(AppArmorBug):
obj.get_clean()
def test_write_invalid_label(self):
obj = IOUringRule(IOUringRule.ALL, 'bar')
obj.label = ''
with self.assertRaises(AppArmorBug):
obj.get_clean()
class IOUringIsCoveredTest(AATest):
def test_is_covered(self):
obj = IOUringRule(IOUringRule.ALL, 'ba*')
self.assertTrue(obj.is_covered(IOUringRule(('sqpoll'), 'ba')))
self.assertTrue(obj.is_covered(IOUringRule(IOUringRule.ALL, 'baz')))
def test_is_not_covered(self):
obj = IOUringRule(('sqpoll'), 'foo')
self.assertFalse(obj.is_covered(IOUringRule(IOUringRule.ALL, 'foo')))
self.assertFalse(obj.is_covered(IOUringRule(('sqpoll'), IOUringRule.ALL)))
class IOUringLogprofHeaderTest(AATest):
tests = (
('io_uring,', [_('Access mode'), _('ALL'), _('Label'), _('ALL')]),
('io_uring sqpoll,', [_('Access mode'), 'sqpoll' , _('Label'), _('ALL')]),
('io_uring override_creds,', [_('Access mode'), 'override_creds', _('Label'), _('ALL')]),
('io_uring (sqpoll,override_creds),', [_('Access mode'), 'override_creds sqpoll', _('Label'), _('ALL')]),
('io_uring sqpoll label=/foo,', [_('Access mode'), 'sqpoll' , _('Label'), '/foo']),
('io_uring override_creds label=bar,', [_('Access mode'), 'override_creds', _('Label'), 'bar']),
('io_uring (sqpoll,override_creds) label=baz,', [_('Access mode'), 'override_creds sqpoll', _('Label'), 'baz']),
)
def _run_test(self, params, expected):
obj = IOUringRule.create_instance(params)
self.assertEqual(obj.logprof_header(), expected)
class IOUringGlobTestAATest(AATest):
def test_glob(self):
self.assertEqual(IOUringRuleset().get_glob('io_uring sqpoll,'), 'io_uring,')
setup_all_loops(__name__)
if __name__ == '__main__':
unittest.main(verbosity=1)