2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-22 10:09:40 +00:00
microbot/ubot/command_handler.py

367 lines
15 KiB
Python

import asyncio
from functools import partial
from inspect import isawaitable
from random import randint
from re import DOTALL, IGNORECASE, escape, search
from traceback import format_exc, print_exc
from telethon import events
from telethon.errors.rpcerrorlist import (ChatAdminRequiredError,
ChatWriteForbiddenError)
from ubot.command import CallbackQueryCommand, Command
from ubot.custom import (ExtendedCallbackQuery, ExtendedInlineQuery,
ExtendedNewMessage)
from ubot.database import ChatWrapper
from .fixes import inline_photos
# 0 = command prefix, 1 = command, 2 = bot username
PATTERN_TEMPLATE = "^{0}({1})(?:@{2})?(?:[ _]|\n+|$)(.*)?"
# 0 = command
SIMPLE_PATTERN_TEMPLATE = "^({0})(?:[ _]|\n+|$)(.*)?"
RAW_PATTERN_TEMPLATE = "{0}"
MODERATION_COMMAND_COOLDOWN_SEC = 3
class CommandHandler():
incoming_commands = []
incoming_lenient_commands = []
inline_photo_commands = []
inline_article_commands = []
callback_queries = []
def __init__(self, loader):
self.loader = loader
self.micro_bot = loader.micro_bot
self.settings = self.micro_bot.settings
self.logger = loader.logger
self.db = loader.db
self.hard_prefix = self.settings.get_list("hard_cmd_prefix") or ["/"]
self.micro_bot.client.add_event_handler(partial(self.report_incoming_excepts, self.incoming_commands), events.NewMessage(incoming=True, forwards=False, func=lambda e: e.raw_text))
self.micro_bot.client.add_event_handler(partial(self.report_incoming_excepts, self.incoming_lenient_commands), events.NewMessage(incoming=True))
self.micro_bot.client.add_event_handler(self.handle_inline, events.InlineQuery())
self.micro_bot.client.add_event_handler(self.handle_callback_query, events.CallbackQuery())
async def report_incoming_excepts(self, command_list, event):
try:
await self.handle_incoming(event, command_list)
except Exception as exception:
if not isinstance(exception, (ChatAdminRequiredError, ChatWriteForbiddenError)):
await event.client.send_message(int(self.settings.get_list("owner_id")[0]), str(format_exc()))
async def handle_incoming(self, event: ExtendedNewMessage, command_list: list[Command]):
chat_db = self.db.get_chat((await event.get_chat()).id)
chat_prefix = chat_db.prefix
for command in command_list:
if command.simple_pattern:
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.raw_text, IGNORECASE|DOTALL)
elif command.raw_pattern:
pattern_match = search(RAW_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.raw_text, IGNORECASE|DOTALL)
else:
if command.not_disableable or command.force_standard_prefix:
prefix_list = self.hard_prefix + [chat_prefix] + ["/"]
else:
prefix_list = self.hard_prefix + [chat_prefix]
pattern_match = search(PATTERN_TEMPLATE.format(f"({'|'.join([escape(i) for i in prefix_list])})", command.pattern + command.pattern_extra, self.micro_bot.me.username), event.raw_text, IGNORECASE|DOTALL)
if pattern_match:
command.uses += 1
if command.moderation and not chat_db.modmode_enabled:
continue
if not (priv_resp := await self.check_privs(event, command, chat_db))[0]:
if priv_resp[1]:
await event.reply(priv_resp[1])
continue
if command.filter:
if isawaitable(command.filter):
if not await command.filter(event):
continue
elif not command.filter(event):
continue
if command.pass_nsfw:
event.nsfw_disabled = not chat_db.nsfw_enabled
if command.raw_pattern:
event.command = command.pattern
else:
event.command = pattern_match.groups()[1]
if event.chat and not command.not_disableable and event.command in chat_db.disabled_commands:
continue
if not command.raw_pattern:
event.args = pattern_match.groups()[-1].strip()
if command.simple_pattern:
event.other_args = pattern_match.groups()[1:-1]
else:
event.other_args = pattern_match.groups()[2:-1]
event.prefix = pattern_match.groups()[0]
event.pattern_match = pattern_match
event.extra = command.extra
event.object = command
event.chat_db = chat_db
await self.execute_command(event, command)
async def handle_inline(self, event: ExtendedInlineQuery):
for command in self.inline_photo_commands:
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
if pattern_match:
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return
await self.handle_inline_photo(event, pattern_match, command)
return
for command in self.inline_article_commands:
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
if pattern_match:
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return
await self.handle_inline_article(event, pattern_match, command)
return
await self.fallback_inline(event)
async def handle_inline_photo(self, event: ExtendedInlineQuery, pattern_match, command):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
event.other_args = pattern_match.groups()[1:-1]
event.command = pattern_match.groups()[0]
event.extra = command.extra
event.object = command
event.parse_mode = command.parse_mode
photo_list = await command.function(event)
if not photo_list:
return
photo_coros = []
for photo in photo_list:
try:
if isinstance(photo, list):
photo_coros += [
self.try_coro(inline_photos.photo(
event.client,
photo[0],
text=photo[1],
parse_mode=event.parse_mode
))
]
else:
photo_coros += [self.try_coro(builder.photo(photo))]
except:
print_exc()
if photo_coros:
photos = await asyncio.gather(*photo_coros)
else:
return
try:
await event.answer([i for i in photos if i], gallery=True)
except:
print_exc()
async def handle_inline_article(self, event: ExtendedInlineQuery, pattern_match, command):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
event.other_args = pattern_match.groups()[1:-1]
event.command = pattern_match.groups()[0]
event.extra = command.extra
event.object = command
event.link_preview = command.link_preview
event.parse_mode = command.parse_mode
result_list = await command.function(event)
if not result_list:
return
articles = []
for result in result_list:
try:
articles += [
await builder.article(
title=result["title"],
description=result["description"],
text=result["text"],
link_preview=event.link_preview,
parse_mode=event.parse_mode,
buttons=result.get("buttons", None)
)
]
except:
print_exc()
try:
await event.answer([i for i in articles if i])
except:
print_exc()
async def handle_callback_query(self, event: ExtendedCallbackQuery):
data_str = event.data.decode("utf-8")
data_id = data_str.split("*")[0]
data_data = data_str.removeprefix(data_id + "*")
for command in self.callback_queries:
if command.data_id == data_id:
event.command = data_id
event.args = data_data
event.extra = command.extra
event.object = command
event.chat_db = None
if not event.via_inline:
event.chat_db = self.db.get_chat((await event.get_chat()).id)
if not (priv_resp := await self.check_privs(event, command, event.chat_db))[0]:
await event.answer(priv_resp[1])
continue
try:
await command.function(event)
except Exception as exception:
await event.reply(f"An error occurred in **{command.function.__name__}**: `{exception}`")
print_exc()
async def fallback_inline(self, event):
defaults_list = self.inline_photo_commands + self.inline_article_commands
try:
await event.answer([await event.builder.article(title=command.pattern, text=f"{self.loader.prefix()}{command.default}") for command in defaults_list if command.default])
except:
print_exc()
async def try_coro(self, coro):
try:
return await coro
except:
return
async def execute_command(self, event: ExtendedNewMessage, command: Command):
try:
if command.locking:
if command.lock_reason:
await event.reply(f"That command is currently locked: {command.lock_reason}")
return
if command.chance and randint(0, 100) <= command.chance or not command.chance:
command.lock_reason = f"In use by **{event.sender_id}** (`{event.raw_text}`)"
await command.function(event)
command.lock_reason = None
elif command.user_locking:
if event.sender_id in command.locked_users:
await event.reply("Please don't spam that command.")
return
if command.chance and randint(0, 100) <= command.chance or not command.chance:
command.locked_users.append(event.sender_id)
await command.function(event)
command.locked_users.remove(event.sender_id)
elif command.moderation:
if not event.chat.id in command.mod_cooldown_chats:
command.mod_cooldown_chats.append(event.chat.id)
await command.function(event)
await asyncio.sleep(MODERATION_COMMAND_COOLDOWN_SEC)
try:
command.mod_cooldown_chats.remove(event.chat.id)
except:
pass
elif command.chance:
if randint(0, 100) <= command.chance:
await command.function(event)
else:
await command.function(event)
except Exception as exception:
command.lock_reason = None
if event.sender_id in command.locked_users:
command.locked_users.remove(event.sender_id)
if not command.silent_bail:
try:
await event.reply(f"An error occurred in **{command.function.__name__}**: `{exception}`")
except:
pass
print_exc()
# returns True if the command can be used, False if not, and an optional error string together in a tuple
# for normal commands, this will be passed to event.reply; for callback queries this will call event.answer
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None) -> tuple[bool, str|None]:
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return (False, None)
if isinstance(command, Command):
if event.chat and chat_db:
if command.nsfw and not chat_db.nsfw_enabled:
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
if command.fun and not chat_db.fun_enabled:
return (False, None)
if event.is_private and command.no_private:
return (False, None if command.silent_bail else "That command can't be used in private!")
if not event.is_private and command.private_only:
return (False, None if command.silent_bail else "That command can only be used in private!")
if command.owner and not self.is_owner(event):
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
if command.sudo and not self.is_sudo(event) and not self.is_owner(event):
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
if command.admin:
if event.chat and event.sender_id:
if event.is_private or not (await event.client.get_permissions(event.chat, event.sender_id)).is_admin and not self.is_sudo(event) and not self.is_owner(event):
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
return (True, None)
def is_owner(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
return str(event.sender_id) in self.settings.get_list("owner_id")
def is_sudo(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
return event.sender_id in self.db.sudo_users
def is_blacklisted(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
return event.query.user_id if isinstance(event, ExtendedInlineQuery) else event.sender_id in self.db.blacklisted_users
@property
def all_incoming_commands(self) -> list[Command]:
return self.incoming_commands + self.incoming_lenient_commands
def push_incoming_command(self, command: Command, lenient: bool = False):
if lenient:
self.incoming_lenient_commands.append(command)
else:
self.incoming_commands.append(command)
def clear_commands(self):
self.incoming_commands = []
self.incoming_lenient_commands = []
self.inline_photo_commands = []
self.inline_article_commands = []
self.callback_queries = []