mirror of
https://github.com/sudo-project/sudo.git
synced 2025-08-31 06:15:37 +00:00
plugins/python: add example python policy plugin
This commit is contained in:
committed by
Todd C. Miller
parent
ee856cc4ba
commit
d8432fca34
173
plugins/python/example_policy_plugin.py
Normal file
173
plugins/python/example_policy_plugin.py
Normal file
@@ -0,0 +1,173 @@
|
||||
import sudo
|
||||
|
||||
import errno
|
||||
import sys
|
||||
import os
|
||||
import pwd
|
||||
import grp
|
||||
import shutil
|
||||
from copy import copy
|
||||
from typing import Tuple, Dict
|
||||
|
||||
|
||||
VERSION = 1.0
|
||||
|
||||
|
||||
class SudoPluginError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SudoPolicyPlugin(sudo.Plugin):
|
||||
"""Example sudo policy plugin
|
||||
|
||||
Demonstrates how to use the sudo policy plugin API. All functions are added
|
||||
as an example on their syntax, but note that most of them are optional
|
||||
(except check_policy). Also typing annotations are just here for the help
|
||||
on the syntax (requires python >= 3.5).
|
||||
|
||||
On detailed description of the functions refer to sudo_plugin manual (man
|
||||
sudo_plugin).
|
||||
|
||||
Most functions can express error or reject through their "int" return value
|
||||
as documented in the manual. The sudo module also has constants for these:
|
||||
sudo.RC_ACCEPT / sudo.RC_OK 1
|
||||
sudo.RC_REJECT 0
|
||||
sudo.RC_ERROR -1
|
||||
sudo.RC_USAGE_ERROR -2
|
||||
|
||||
If the function returns "None" (for example does not call return), it will
|
||||
be considered sudo.RC_OK. If an exception is raised, its backtrace will be
|
||||
shown to the user and the plugin function returns sudo.RC_ERROR. If that is
|
||||
not acceptable, catch it.
|
||||
"""
|
||||
|
||||
_allowed_commands = {"id", "whoami"}
|
||||
_safe_password = "12345"
|
||||
|
||||
# -- Plugin API functions --
|
||||
|
||||
def __init__(self, user_env: Tuple[str, ...], settings: Tuple[str, ...],
|
||||
version: str, **kwargs):
|
||||
"""The constructor matches the C sudo plugin API open() call
|
||||
|
||||
Other variables you can currently use as arguments are:
|
||||
user_info: Tuple[str, ...]
|
||||
plugin_options: Tuple[str, ...]
|
||||
|
||||
For their detailed description, see the open() call of the C plugin API
|
||||
in the sudo manual ("man sudo").
|
||||
"""
|
||||
if not version.startswith("1."):
|
||||
raise sudo.SudoException(
|
||||
"This plugin plugin is not compatible with python plugin"
|
||||
"API version {}".format(version))
|
||||
|
||||
self.user_env = sudo.options_as_dict(user_env)
|
||||
self.settings = sudo.options_as_dict(settings)
|
||||
|
||||
def check_policy(self, argv: Tuple[str, ...], env_add: Tuple[str, ...]):
|
||||
cmd = argv[0]
|
||||
# Example for a simple reject:
|
||||
if not self._is_command_allowed(cmd):
|
||||
sudo.log_error("You are not allowed to run this command!")
|
||||
return sudo.RC_REJECT
|
||||
|
||||
# The environment the command will be executed with (we allow any here)
|
||||
user_env_out = sudo.options_from_dict(self.user_env) + env_add
|
||||
|
||||
try:
|
||||
command_info_out = sudo.options_from_dict({
|
||||
"command": self._find_on_path(cmd), # Absolute path of command
|
||||
"runas_uid": self._runas_uid(), # The user id
|
||||
"runas_gid": self._runas_gid(), # The group id
|
||||
})
|
||||
except SudoPluginError as error:
|
||||
sudo.log_error(str(error))
|
||||
return sudo.RC_ERROR
|
||||
|
||||
return (sudo.RC_ACCEPT, command_info_out, argv, user_env_out)
|
||||
|
||||
def init_session(self, user_pwd: Tuple, user_env: Tuple[str, ...]):
|
||||
"""Perform session setup
|
||||
|
||||
Beware that user_pwd can be None if user is not present in the password
|
||||
database. Otherwise it is a tuple convertible to pwd.struct_passwd.
|
||||
"""
|
||||
# conversion example:
|
||||
user_pwd = pwd.struct_passwd(user_pwd) if user_pwd else None
|
||||
|
||||
# This is how you change the user_env:
|
||||
return (sudo.RC_OK, user_env)
|
||||
|
||||
# If you do not want to change user_env, you can also just return (or None):
|
||||
# return sudo.RC_OK
|
||||
|
||||
def list(self, argv: Tuple[str, ...], is_verbose: int, user: str):
|
||||
cmd = argv[0] if argv else None
|
||||
as_user_text = "as user '{}'".format(user) if user else ""
|
||||
|
||||
if cmd:
|
||||
allowed_text = "" if self._is_command_allowed(cmd) else "NOT "
|
||||
sudo.log_info("You are {}allowed to execute command '{}'{}"
|
||||
.format(allowed_text, cmd, as_user_text))
|
||||
|
||||
if not cmd or is_verbose:
|
||||
sudo.log_info("Only the following commands are allowed:",
|
||||
", ".join(self._allowed_commands), as_user_text)
|
||||
|
||||
def validate(self):
|
||||
pass # we have no cache
|
||||
|
||||
def invalidate(self, remove: int):
|
||||
pass # we have no cache
|
||||
|
||||
def show_version(self, is_verbose: int):
|
||||
sudo.log_info("Python Policy Plugin version: {}".format(VERSION))
|
||||
if is_verbose:
|
||||
sudo.log_info("Python interpreter version:", sys.version)
|
||||
|
||||
def close(self, exit_status: int, error: int) -> None:
|
||||
if error == 0:
|
||||
sudo.log_info("The command returned with exit_status {}".format(
|
||||
exit_status))
|
||||
else:
|
||||
error_name = errno.errorcode.get(error, "???")
|
||||
sudo.log_error(
|
||||
"Failed to execute command, execve syscall returned "
|
||||
"{} ({})".format(error, error_name))
|
||||
|
||||
# -- Helper functions --
|
||||
|
||||
def _is_command_allowed(self, cmd):
|
||||
return os.path.basename(cmd) in self._allowed_commands
|
||||
|
||||
def _find_on_path(self, cmd):
|
||||
if os.path.isabs(cmd):
|
||||
return cmd
|
||||
|
||||
path = self.user_env.get("PATH", "/usr/bin:/bin")
|
||||
absolute_cmd = shutil.which(cmd, path=path)
|
||||
if not absolute_cmd:
|
||||
raise SudoPluginError("Can not find cmd '{}' on PATH".format(cmd))
|
||||
return absolute_cmd
|
||||
|
||||
def _runas_pwd(self):
|
||||
runas_user = self.settings.get("runas_user") or "root"
|
||||
try:
|
||||
return pwd.getpwnam(runas_user)
|
||||
except KeyError:
|
||||
raise SudoPluginError("Could not find user '{}'".format(runas_user))
|
||||
|
||||
def _runas_uid(self):
|
||||
return self._runas_pwd().pw_uid
|
||||
|
||||
def _runas_gid(self):
|
||||
runas_group = self.settings.get("runas_group")
|
||||
if runas_group is None:
|
||||
return self._runas_pwd().pw_gid
|
||||
|
||||
try:
|
||||
return grp.getgrnam(runas_group).gr_gid
|
||||
except KeyError:
|
||||
raise SudoPluginError(
|
||||
"Could not find group '{}'".format(runas_group))
|
Reference in New Issue
Block a user