2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-22 18:19:16 +00:00
microbot/ubot/command_handler.py

256 lines
10 KiB
Python
Raw Normal View History

2020-03-16 21:02:05 -04:00
# SPDX-License-Identifier: GPL-2.0-or-later
2023-10-16 09:46:31 -04:00
2020-05-16 17:52:29 -04:00
import asyncio
2020-06-29 15:29:20 -04:00
from random import randint
2023-10-16 09:46:31 -04:00
from re import escape, search
2020-06-20 10:13:45 -04:00
from telethon import events, functions, types
2023-10-16 09:46:31 -04:00
2020-06-15 13:04:35 -04:00
from .fixes import inline_photos
2023-10-16 09:46:31 -04:00
class CommandHandler():
2020-07-18 15:11:09 -04:00
pattern_template = "(?is)^{0}({1})(?: |$|_|@{2}(?: |$|_))(.*)"
inline_pattern_template = "(?is)^({0})(?: |$|_)(.*)"
raw_pattern_template = "(?is){0}"
incoming_commands = []
inline_photo_commands = []
inline_article_commands = []
callback_queries = []
def __init__(self, client, settings, loader):
self.username = client.loop.run_until_complete(client.get_me()).username
self.settings = settings
2020-06-23 15:28:09 -04:00
self.loader = loader
2023-10-16 09:46:31 -04:00
client.add_event_handler(self.handle_incoming, events.NewMessage(incoming=True))
2020-05-25 14:04:56 -04:00
client.add_event_handler(self.handle_inline, events.InlineQuery())
client.add_event_handler(self.handle_callback_query, events.CallbackQuery())
2023-10-16 09:46:31 -04:00
async def handle_incoming(self, event):
prefix = "|".join([escape(i) for i in (self.settings.get_list("cmd_prefix") or ['.'])])
2023-10-16 09:46:31 -04:00
2020-06-16 17:00:09 -04:00
for value in self.incoming_commands:
2020-06-26 19:17:50 -04:00
if value["simple_pattern"]:
2020-07-16 13:41:42 -04:00
pattern_match = search(self.inline_pattern_template.format(value["pattern"] + value["pattern_extra"]), event.raw_text)
2020-06-26 19:43:36 -04:00
elif value["raw_pattern"]:
2020-07-16 13:41:42 -04:00
pattern_match = search(self.raw_pattern_template.format(value["pattern"] + value["pattern_extra"]), event.raw_text)
2020-06-26 19:17:50 -04:00
else:
2020-07-16 13:41:42 -04:00
pattern_match = search(self.pattern_template.format(f"({prefix})", value["pattern"] + value["pattern_extra"], self.username), event.raw_text)
2020-04-17 16:47:19 -04:00
if pattern_match:
2020-07-18 15:11:09 -04:00
if not await self.check_privs(event, value):
return
if value["pass_nsfw"]:
2020-06-26 15:02:38 -04:00
event.nsfw_disabled = str(event.chat.id) in self.settings.get_list("nsfw_blacklist")
2020-08-24 12:07:23 -04:00
event.command = pattern_match.groups()[1]
if event.command in self.loader.db.get_disabled_commands(event.chat.id):
print(f"Attempted command ({event.raw_text}) in chat which disabled it ({event.chat.id}) from ID {event.from_id}")
return
2020-04-17 16:47:19 -04:00
event.pattern_match = pattern_match
2020-06-26 15:24:45 -04:00
event.args = pattern_match.groups()[-1].strip()
2020-07-16 13:41:42 -04:00
event.other_args = pattern_match.groups()[2:-1]
2020-08-24 09:33:53 -04:00
event.extra = value["extra"]
2023-10-16 09:46:31 -04:00
2020-07-18 15:11:09 -04:00
await self.execute_command(event, value)
2020-05-16 17:52:29 -04:00
2020-05-25 14:04:56 -04:00
async def handle_inline(self, event):
2020-06-16 17:00:09 -04:00
for value in self.inline_photo_commands:
2020-07-14 09:56:45 -04:00
pattern_match = search(self.inline_pattern_template.format(value["pattern"]), event.text)
2020-05-16 17:52:29 -04:00
2020-05-25 14:04:56 -04:00
if pattern_match:
2020-06-13 20:30:43 -04:00
if self.is_blacklisted(event, True):
2020-07-14 09:56:45 -04:00
print(f"Attempted command ({event.text}) from blacklisted ID {event.from_id}")
return
2020-05-25 14:04:56 -04:00
await self.handle_inline_photo(event, pattern_match, value)
return
2020-05-21 13:53:11 -04:00
2020-06-16 17:00:09 -04:00
for value in self.inline_article_commands:
2020-07-14 09:56:45 -04:00
pattern_match = search(self.inline_pattern_template.format(value["pattern"]), event.text)
2020-05-21 13:53:11 -04:00
if pattern_match:
2020-06-13 20:30:43 -04:00
if self.is_blacklisted(event, True):
2020-07-14 09:56:45 -04:00
print(f"Attempted command ({event.text}) from blacklisted ID {event.from_id}")
return
2020-05-25 14:04:56 -04:00
await self.handle_inline_article(event, pattern_match, value)
return
2020-05-16 17:52:29 -04:00
2020-05-25 14:04:56 -04:00
await self.fallback_inline(event)
2020-05-21 13:53:11 -04:00
2020-05-25 14:04:56 -04:00
async def handle_inline_photo(self, event, pattern_match, value):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
2020-07-16 13:41:42 -04:00
event.other_args = pattern_match.groups()[1:-1]
event.command = pattern_match.groups()[0]
2020-05-21 13:53:11 -04:00
2020-06-15 13:04:35 -04:00
photo_list = await value["function"](event)
2020-05-21 13:53:11 -04:00
2020-06-15 13:04:35 -04:00
if not photo_list:
2020-05-25 14:04:56 -04:00
return
2020-05-21 13:53:11 -04:00
2020-05-25 14:04:56 -04:00
photo_coros = []
2020-05-16 17:52:29 -04:00
2020-06-15 13:04:35 -04:00
for photo in photo_list:
2020-05-25 14:04:56 -04:00
try:
2020-06-15 13:04:35 -04:00
if isinstance(photo, list):
photo_coros += [self.try_coro(inline_photos.photo(event.client, photo[0], text=photo[1]))]
else:
photo_coros += [self.try_coro(builder.photo(photo))]
2020-05-25 14:04:56 -04:00
except:
pass
if photo_coros:
photos = await asyncio.gather(*photo_coros)
else:
return
2020-05-16 17:52:29 -04:00
2020-05-25 14:04:56 -04:00
try:
2020-05-25 14:38:53 -04:00
await event.answer([i for i in photos if i], gallery=True)
2020-05-25 14:04:56 -04:00
except:
pass
async def handle_inline_article(self, event, pattern_match, value):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
2020-06-23 11:58:33 -04:00
event.other_args = pattern_match.groups()[:-1]
2020-05-25 14:04:56 -04:00
result_list = await value["function"](event)
if not result_list:
return
articles = []
for result in result_list:
2020-05-24 12:10:54 -04:00
try:
2020-05-25 14:04:56 -04:00
articles += [await builder.article(title=result["title"], description=result["description"], text=result["text"])]
2020-05-24 12:10:54 -04:00
except:
pass
2020-05-16 17:52:29 -04:00
2020-05-25 14:04:56 -04:00
try:
await event.answer([i for i in articles if i])
except:
pass
async def handle_callback_query(self, event):
data_str = event.data.decode("utf-8")
data_id = data_str.split("*")[0]
data_data = data_str.lstrip(data_id + "*")
2020-06-16 17:00:09 -04:00
for value in self.callback_queries:
2020-06-16 20:20:34 -04:00
if value["data_id"] == data_id:
event.args = data_data
2020-08-24 09:33:53 -04:00
event.extra = value["extra"]
try:
await value["function"](event)
except Exception as exception:
2020-09-11 20:02:43 -04:00
await event.reply(f"An error occurred in **{value['function'].__name__}**: `{exception}`")
raise exception
2020-05-25 14:04:56 -04:00
async def fallback_inline(self, event):
2020-06-16 17:00:09 -04:00
defaults_list = self.inline_photo_commands + self.inline_article_commands
2020-05-25 14:04:56 -04:00
try:
2020-06-23 15:28:09 -04:00
await event.answer([await event.builder.article(title=value["pattern"], text=f"{self.loader.prefix()}{value['default']}") for value in defaults_list if value["default"]])
2020-05-25 14:04:56 -04:00
except:
pass
2020-05-21 13:53:11 -04:00
async def try_coro(self, coro):
2020-05-16 17:52:29 -04:00
try:
2020-05-21 13:53:11 -04:00
return await coro
2020-05-16 17:52:29 -04:00
except:
2020-05-21 13:53:11 -04:00
return
2020-07-18 15:11:09 -04:00
async def execute_command(self, event, value):
try:
if value["locking"]:
if value["lockreason"]:
await event.reply(f"That command is currently locked: {value['lockreason']}")
return
if value["chance"] and randint(0, 100) <= value["chance"] or not value["chance"]:
value["lockreason"] = f"In use by **{event.from_id}** (`{event.raw_text}`)"
await value["function"](event)
value["lockreason"] = None
elif value["userlocking"]:
if event.from_id in value["lockedusers"]:
await event.reply(f"Please don't spam that command.")
return
if value["chance"] and randint(0, 100) <= value["chance"] or not value["chance"]:
value["lockedusers"].append(event.from_id)
await value["function"](event)
value["lockedusers"].remove(event.from_id)
else:
if value["chance"] and randint(0, 100) <= value["chance"] or not value["chance"]:
await value["function"](event)
except Exception as exception:
value["lockreason"] = None
if event.from_id in value["lockedusers"]:
value["lockedusers"].remove(event.from_id)
2020-09-11 20:02:43 -04:00
await event.reply(f"An error occurred in **{value['function'].__name__}**: `{exception}`")
2020-07-18 15:11:09 -04:00
raise exception
async def check_privs(self, event, value):
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
print(f"Attempted command ({event.raw_text}) from blacklisted ID {event.from_id}")
return False
2020-07-18 15:11:09 -04:00
if value["owner"] and not self.is_owner(event):
await event.reply("You lack the permissions to use that command!")
2020-07-18 15:11:09 -04:00
print(f"Attempted owner command ({event.raw_text}) from ID {event.from_id}")
return False
2020-07-18 15:11:09 -04:00
if value["sudo"] and not self.is_sudo(event) and not self.is_owner(event):
await event.reply("You lack the permissions to use that command!")
2020-07-18 15:11:09 -04:00
print(f"Attempted sudo command ({event.raw_text}) from ID {event.from_id}")
return False
2020-07-18 15:11:09 -04:00
if value["admin"] and not await self.is_admin(event) and not self.is_sudo(event) and not self.is_owner(event):
await event.reply("You lack the permissions to use that command!")
2020-07-18 15:11:09 -04:00
print(f"Attempted admin command ({event.raw_text}) from ID {event.from_id}")
return False
2020-07-18 15:11:09 -04:00
if value["nsfw"] and str(event.chat.id) in self.settings.get_list("nsfw_blacklist"):
await event.reply("NSFW commands are disabled in this chat!")
2020-07-18 15:11:09 -04:00
print(f"Attempted NSFW command ({event.raw_text}) in blacklisted chat ({event.chat.id}) from ID {event.from_id}")
2020-06-20 10:13:45 -04:00
return False
2020-06-12 13:49:57 -04:00
2020-08-04 07:06:11 -04:00
if value["fun"] and str(event.chat.id) in self.settings.get_list("fun_blacklist"):
print(f"Attempted fun command ({event.raw_text}) in blacklisted chat ({event.chat.id}) from ID {event.from_id}")
return False
2020-07-18 15:11:09 -04:00
return True
def is_owner(self, event):
return bool(str(event.from_id) in self.settings.get_list("owner_id"))
def is_sudo(self, event):
return bool(str(event.from_id) in self.settings.get_list("sudo_users"))
async def is_admin(self, event):
2020-08-24 12:07:23 -04:00
if event.is_private:
return True
2020-07-18 15:11:09 -04:00
channel_participant = await event.client(functions.channels.GetParticipantRequest(event.chat, event.from_id))
return bool(isinstance(channel_participant.participant, (types.ChannelParticipantAdmin, types.ChannelParticipantCreator)))
2020-06-13 20:30:43 -04:00
def is_blacklisted(self, event, inline=False):
if inline:
user_id = event.query.user_id
else:
user_id = event.from_id
2020-07-18 15:11:09 -04:00
return bool(str(user_id) in self.settings.get_list("blacklisted_users"))