2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-22 18:19:16 +00:00
microbot/ubot/command_handler.py
2023-10-16 09:54:02 -04:00

211 lines
7.8 KiB
Python

# SPDX-License-Identifier: GPL-2.0-or-later
import asyncio
from re import escape, search
from telethon import events, types
from .fixes import inline_photos
class CommandHandler():
def __init__(self, client, logger, settings):
self.username = client.loop.run_until_complete(client.get_me()).username
self.pattern_template = "(?is)^{0}{1}(?: |$|_|@{2}(?: |$|_))(.*)"
self.inline_pattern_template = "(?is)^{0}(?: |$|_)(.*)"
self.incoming_commands = {}
self.inline_photo_commands = {}
self.inline_article_commands = {}
self.callback_queries = {}
self.logger = logger
self.settings = settings
client.add_event_handler(self.handle_incoming, events.NewMessage(incoming=True))
client.add_event_handler(self.handle_inline, events.InlineQuery())
client.add_event_handler(self.handle_callback_query, events.CallbackQuery())
async def handle_incoming(self, event):
prefix = escape(self.settings.get_config("cmd_prefix") or '.')
for key, value in self.incoming_commands.items():
pattern_match = search(self.pattern_template.format("" if value["noprefix"] else prefix, key, self.username), event.text)
if pattern_match:
if self.is_blacklisted(event):
print(f"Attempted command ({event.text}) from blacklisted ID {event.from_id}")
return
if value["owner"] and not self.is_owner(event):
print(f"Attempted owner command ({event.text}) from ID {event.from_id}")
continue
elif value["sudo"] and not self.is_sudo(event) and not self.is_owner(event):
print(f"Attempted sudo command ({event.text}) from ID {event.from_id}")
continue
elif value["admin"] and not await self.is_admin(event) and not self.is_sudo(event) and not self.is_owner(event):
print(f"Attempted admin command ({event.text}) from ID {event.from_id}")
continue
if value["nsfw"] and str(event.chat.id) in self.settings.get_list("nsfw_blacklist"):
print(f"Attempted NSFW command ({event.text}) in blacklisted chat ({event.chat.id}) from ID {event.from_id}")
continue
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
event.extras = value["extras"]
try:
if value["locking"]:
if value["locked"]:
await event.reply(f"That command is currently locked, try again later!")
continue
else:
value["locked"] = True
await value["function"](event)
value["locked"] = False
else:
await value["function"](event)
except Exception as exception:
value["locked"] = False
self.logger.warn(f"{value['function'].__name__} - {exception}")
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
raise exception
async def handle_inline(self, event):
for key, value in self.inline_photo_commands.items():
pattern_match = search(self.inline_pattern_template.format(key), event.text)
if pattern_match:
if self.is_blacklisted(event, True):
print(f"Attempted command ({event.text}) from blacklisted ID {event.from_id}")
return
await self.handle_inline_photo(event, pattern_match, value)
return
for key, value in self.inline_article_commands.items():
pattern_match = search(self.inline_pattern_template.format(key), event.text)
if pattern_match:
if self.is_blacklisted(event, True):
print(f"Attempted command ({event.text}) from blacklisted ID {event.from_id}")
return
await self.handle_inline_article(event, pattern_match, value)
return
await self.fallback_inline(event)
async def handle_inline_photo(self, event, pattern_match, value):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
photo_list = await value["function"](event)
if not photo_list:
return
photo_coros = []
for photo in photo_list:
try:
if isinstance(photo, list):
photo_coros += [self.try_coro(inline_photos.photo(event.client, photo[0], text=photo[1]))]
else:
photo_coros += [self.try_coro(builder.photo(photo))]
except:
pass
if photo_coros:
photos = await asyncio.gather(*photo_coros)
else:
return
try:
await event.answer([i for i in photos if i], gallery=True)
except:
pass
async def handle_inline_article(self, event, pattern_match, value):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
result_list = await value["function"](event)
if not result_list:
return
articles = []
for result in result_list:
try:
articles += [await builder.article(title=result["title"], description=result["description"], text=result["text"])]
except:
pass
try:
await event.answer([i for i in articles if i])
except:
pass
async def handle_callback_query(self, event):
data_str = event.data.decode("utf-8")
data_id = data_str.split("*")[0]
data_data = data_str.lstrip(data_id + "*")
for key, value in self.callback_queries.items():
if key == data_id:
event.args = data_data
event.extras = value["extras"]
try:
await value["function"](event)
except Exception as exception:
self.logger.warn(f"{value['function'].__name__} - {exception}")
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
raise exception
async def fallback_inline(self, event):
defaults_dict = {**self.inline_photo_commands, **self.inline_article_commands}
try:
await event.answer([await event.builder.article(title=key, text=f"{self.settings.get_config('cmd_prefix') or '.'}{value['default']}") for key, value in defaults_dict.items() if value["default"]])
except:
pass
async def try_coro(self, coro):
try:
return await coro
except:
return
def is_owner(self, event):
if str(event.from_id) in self.settings.get_list("owner_id"):
return True
else:
return False
def is_sudo(self, event):
if str(event.from_id) in self.settings.get_list("sudo_users"):
return True
else:
return False
async def is_admin(self, event):
async for user in event.client.iter_participants(event.chat, limit=10000, filter=types.ChannelParticipantsAdmins):
if user.id == event.from_id:
return True
return False
def is_blacklisted(self, event, inline=False):
if inline:
user_id = event.query.user_id
else:
user_id = event.from_id
if str(user_id) in self.settings.get_list("blacklisted_users"):
return True
else:
return False