2020-03-16 21:02:05 -04:00
|
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
2023-10-16 09:46:31 -04:00
|
|
|
|
2020-05-16 17:52:29 -04:00
|
|
|
import asyncio
|
2023-10-16 09:46:31 -04:00
|
|
|
from re import escape, search
|
|
|
|
|
2020-06-09 21:05:53 -04:00
|
|
|
from telethon import events, types
|
2023-10-16 09:46:31 -04:00
|
|
|
|
|
|
|
|
|
|
|
class CommandHandler():
|
2020-04-16 16:26:05 -04:00
|
|
|
def __init__(self, client, logger, settings):
|
2020-04-24 10:57:54 -04:00
|
|
|
self.username = client.loop.run_until_complete(client.get_me()).username
|
2020-04-28 14:57:08 -04:00
|
|
|
self.pattern_template = "(?is)^{0}{1}(?: |$|_|@{2}(?: |$|_))(.*)"
|
2020-05-21 13:53:11 -04:00
|
|
|
self.inline_pattern_template = "(?is)^{0}(?: |$|_)(.*)"
|
2023-10-16 09:46:31 -04:00
|
|
|
self.incoming_commands = {}
|
2020-05-16 17:52:29 -04:00
|
|
|
self.inline_photo_commands = {}
|
2020-05-25 14:04:56 -04:00
|
|
|
self.inline_article_commands = {}
|
2023-10-16 09:46:31 -04:00
|
|
|
self.logger = logger
|
2020-04-16 16:26:05 -04:00
|
|
|
self.settings = settings
|
2023-10-16 09:46:31 -04:00
|
|
|
client.add_event_handler(self.handle_incoming, events.NewMessage(incoming=True))
|
2020-05-25 14:04:56 -04:00
|
|
|
client.add_event_handler(self.handle_inline, events.InlineQuery())
|
2023-10-16 09:46:31 -04:00
|
|
|
|
2020-04-16 16:26:05 -04:00
|
|
|
async def handle_incoming(self, event):
|
|
|
|
prefix = escape(self.settings.get_config("cmd_prefix") or '.')
|
2023-10-16 09:46:31 -04:00
|
|
|
|
2020-04-16 16:26:05 -04:00
|
|
|
for key, value in self.incoming_commands.items():
|
2020-04-24 10:57:54 -04:00
|
|
|
pattern_match = search(self.pattern_template.format("" if value["noprefix"] else prefix, key, self.username), event.text)
|
2020-04-16 16:26:05 -04:00
|
|
|
|
2020-04-17 16:47:19 -04:00
|
|
|
if pattern_match:
|
2020-04-17 16:54:42 -04:00
|
|
|
if value["sudo"] and str(event.from_id) not in self.settings.get_config("owner_id").split(","):
|
2020-04-16 16:26:05 -04:00
|
|
|
print(f"Attempted sudo command ({event.text}) from ID {event.from_id}")
|
2020-04-17 16:47:19 -04:00
|
|
|
continue
|
2020-06-09 21:05:53 -04:00
|
|
|
elif value["admin"] and str(event.from_id) not in self.settings.get_config("owner_id").split(",") and not await self.check_admin(event):
|
|
|
|
print(f"Attempted admin command ({event.text}) from ID {event.from_id}")
|
|
|
|
continue
|
2020-04-17 16:47:19 -04:00
|
|
|
|
|
|
|
event.pattern_match = pattern_match
|
2020-05-02 16:40:26 -04:00
|
|
|
event.args = pattern_match.groups()[-1]
|
2020-05-05 19:51:55 -04:00
|
|
|
event.extras = value["extras"]
|
2023-10-16 09:46:31 -04:00
|
|
|
|
|
|
|
try:
|
2020-04-16 16:26:05 -04:00
|
|
|
await value["function"](event)
|
2023-10-16 09:46:31 -04:00
|
|
|
except Exception as exception:
|
2020-04-16 16:26:05 -04:00
|
|
|
self.logger.warn(f"{value['function'].__name__} - {exception}")
|
|
|
|
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
|
2020-02-23 15:04:44 -05:00
|
|
|
raise exception
|
2020-05-16 17:52:29 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
async def handle_inline(self, event):
|
|
|
|
for key, value in self.inline_photo_commands.items():
|
|
|
|
pattern_match = search(self.inline_pattern_template.format(key), event.text)
|
2020-05-16 17:52:29 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
if pattern_match:
|
|
|
|
await self.handle_inline_photo(event, pattern_match, value)
|
|
|
|
return
|
2020-05-21 13:53:11 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
for key, value in self.inline_article_commands.items():
|
2020-05-21 13:53:11 -04:00
|
|
|
pattern_match = search(self.inline_pattern_template.format(key), event.text)
|
|
|
|
|
|
|
|
if pattern_match:
|
2020-05-25 14:04:56 -04:00
|
|
|
await self.handle_inline_article(event, pattern_match, value)
|
|
|
|
return
|
2020-05-16 17:52:29 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
await self.fallback_inline(event)
|
2020-05-21 13:53:11 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
async def handle_inline_photo(self, event, pattern_match, value):
|
|
|
|
builder = event.builder
|
|
|
|
event.pattern_match = pattern_match
|
|
|
|
event.args = pattern_match.groups()[-1]
|
2020-05-21 13:53:11 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
url_list = await value["function"](event)
|
2020-05-21 13:53:11 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
if not url_list:
|
|
|
|
return
|
2020-05-21 13:53:11 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
photo_coros = []
|
2020-05-16 17:52:29 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
for url in url_list:
|
|
|
|
try:
|
|
|
|
photo_coros += [self.try_coro(builder.photo(url))]
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
if photo_coros:
|
|
|
|
photos = await asyncio.gather(*photo_coros)
|
|
|
|
else:
|
|
|
|
return
|
2020-05-16 17:52:29 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
try:
|
2020-05-25 14:38:53 -04:00
|
|
|
await event.answer([i for i in photos if i], gallery=True)
|
2020-05-25 14:04:56 -04:00
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
async def handle_inline_article(self, event, pattern_match, value):
|
|
|
|
builder = event.builder
|
|
|
|
event.pattern_match = pattern_match
|
|
|
|
event.args = pattern_match.groups()[-1]
|
|
|
|
|
|
|
|
result_list = await value["function"](event)
|
|
|
|
|
|
|
|
if not result_list:
|
|
|
|
return
|
|
|
|
|
|
|
|
articles = []
|
|
|
|
|
|
|
|
for result in result_list:
|
2020-05-24 12:10:54 -04:00
|
|
|
try:
|
2020-05-25 14:04:56 -04:00
|
|
|
articles += [await builder.article(title=result["title"], description=result["description"], text=result["text"])]
|
2020-05-24 12:10:54 -04:00
|
|
|
except:
|
|
|
|
pass
|
2020-05-16 17:52:29 -04:00
|
|
|
|
2020-05-25 14:04:56 -04:00
|
|
|
try:
|
|
|
|
await event.answer([i for i in articles if i])
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
async def fallback_inline(self, event):
|
|
|
|
defaults_dict = {**self.inline_photo_commands, **self.inline_article_commands}
|
|
|
|
|
|
|
|
try:
|
|
|
|
await event.answer([await event.builder.article(title=key, text=f"{self.settings.get_config('cmd_prefix') or '.'}{value['default']}") for key, value in defaults_dict.items() if value["default"]])
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
2020-05-21 13:53:11 -04:00
|
|
|
async def try_coro(self, coro):
|
2020-05-16 17:52:29 -04:00
|
|
|
try:
|
2020-05-21 13:53:11 -04:00
|
|
|
return await coro
|
2020-05-16 17:52:29 -04:00
|
|
|
except:
|
2020-05-21 13:53:11 -04:00
|
|
|
return
|
2020-06-09 21:05:53 -04:00
|
|
|
|
|
|
|
async def check_admin(self, event):
|
|
|
|
async for user in event.client.iter_participants(event.chat, limit=10000, filter=types.ChannelParticipantsAdmins):
|
|
|
|
if user.id == event.from_id:
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|