mirror of
https://github.com/Nick80835/microbot
synced 2025-08-22 01:58:17 +00:00
Compare commits
5 Commits
8857b4710d
...
0119698e81
Author | SHA1 | Date | |
---|---|---|---|
|
0119698e81 | ||
|
4aae840a8c | ||
|
ffea3fe581 | ||
|
30238f088a | ||
|
c799f34f89 |
@ -4,13 +4,15 @@ from logging import INFO, basicConfig, getLogger
|
||||
from time import time
|
||||
|
||||
import telethon
|
||||
from telethon import TelegramClient
|
||||
from telethon.errors.rpcerrorlist import (AccessTokenExpiredError,
|
||||
AccessTokenInvalidError,
|
||||
TokenInvalidError)
|
||||
from telethon.network.connection.tcpabridged import \
|
||||
ConnectionTcpAbridged as CTA
|
||||
|
||||
from .custom import ExtendedEvent
|
||||
from .custom import (ExtendedCallbackQuery, ExtendedInlineQuery,
|
||||
ExtendedNewMessage)
|
||||
from .loader import Loader
|
||||
from .settings import Settings
|
||||
|
||||
@ -26,9 +28,9 @@ loop = asyncio.get_event_loop()
|
||||
|
||||
class MicroBot():
|
||||
settings = Settings()
|
||||
client = None
|
||||
logger = None
|
||||
loader = None
|
||||
logger = logger
|
||||
client: TelegramClient
|
||||
loader: Loader
|
||||
|
||||
def __init__(self):
|
||||
loop.run_until_complete(self._initialize_bot())
|
||||
@ -37,7 +39,7 @@ class MicroBot():
|
||||
global ldr
|
||||
|
||||
try:
|
||||
self.client = await telethon.TelegramClient(
|
||||
self.client = await TelegramClient(
|
||||
self.settings.get_config("session_name", "bot0") or "bot0",
|
||||
self.settings.get_config("api_id"),
|
||||
self.settings.get_config("api_hash"),
|
||||
@ -76,7 +78,10 @@ class MicroBot():
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
telethon.events.NewMessage.Event = ExtendedEvent
|
||||
telethon.events.NewMessage.Event = ExtendedNewMessage
|
||||
telethon.events.CallbackQuery.Event = ExtendedCallbackQuery
|
||||
telethon.events.InlineQuery.Event = ExtendedInlineQuery
|
||||
|
||||
micro_bot = MicroBot()
|
||||
|
||||
try:
|
||||
|
@ -6,6 +6,7 @@ class Command:
|
||||
self.module = func.__module__.split(".")[-1]
|
||||
self.function = func
|
||||
self.data = {}
|
||||
self.uses = 0
|
||||
|
||||
self.pattern = args.get("pattern")
|
||||
self.simple_pattern = args.get("simple_pattern", False)
|
||||
|
@ -8,6 +8,11 @@ from telethon import events
|
||||
from telethon.errors.rpcerrorlist import (ChatAdminRequiredError,
|
||||
ChatWriteForbiddenError)
|
||||
|
||||
from ubot.command import CallbackQueryCommand, Command
|
||||
from ubot.custom import (ExtendedCallbackQuery, ExtendedInlineQuery,
|
||||
ExtendedNewMessage)
|
||||
from ubot.database import ChatWrapper
|
||||
|
||||
from .fixes import inline_photos
|
||||
|
||||
|
||||
@ -39,7 +44,7 @@ class CommandHandler():
|
||||
if not isinstance(exception, (ChatAdminRequiredError, ChatWriteForbiddenError)):
|
||||
await event.client.send_message(int(self.settings.get_list("owner_id")[0]), str(format_exc()))
|
||||
|
||||
async def handle_incoming(self, event):
|
||||
async def handle_incoming(self, event: ExtendedNewMessage):
|
||||
chat_db = self.db.get_chat((await event.get_chat()).id)
|
||||
chat_prefix = chat_db.prefix
|
||||
|
||||
@ -57,6 +62,8 @@ class CommandHandler():
|
||||
pattern_match = search(self.pattern_template.format(f"({'|'.join([escape(i) for i in prefix_list])})", command.pattern + command.pattern_extra, self.micro_bot.me.username), event.raw_text, IGNORECASE|DOTALL)
|
||||
|
||||
if pattern_match:
|
||||
command.uses += 1
|
||||
|
||||
if command.moderation and not chat_db.modmode_enabled:
|
||||
continue
|
||||
|
||||
@ -100,7 +107,7 @@ class CommandHandler():
|
||||
|
||||
await self.execute_command(event, command)
|
||||
|
||||
async def handle_inline(self, event):
|
||||
async def handle_inline(self, event: ExtendedInlineQuery):
|
||||
for command in self.inline_photo_commands:
|
||||
pattern_match = search(self.simple_pattern_template.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
|
||||
|
||||
@ -123,7 +130,7 @@ class CommandHandler():
|
||||
|
||||
await self.fallback_inline(event)
|
||||
|
||||
async def handle_inline_photo(self, event, pattern_match, command):
|
||||
async def handle_inline_photo(self, event: ExtendedInlineQuery, pattern_match, command):
|
||||
builder = event.builder
|
||||
event.pattern_match = pattern_match
|
||||
event.args = pattern_match.groups()[-1]
|
||||
@ -166,7 +173,7 @@ class CommandHandler():
|
||||
except:
|
||||
print_exc()
|
||||
|
||||
async def handle_inline_article(self, event, pattern_match, command):
|
||||
async def handle_inline_article(self, event: ExtendedInlineQuery, pattern_match, command):
|
||||
builder = event.builder
|
||||
event.pattern_match = pattern_match
|
||||
event.args = pattern_match.groups()[-1]
|
||||
@ -204,7 +211,7 @@ class CommandHandler():
|
||||
except:
|
||||
print_exc()
|
||||
|
||||
async def handle_callback_query(self, event):
|
||||
async def handle_callback_query(self, event: ExtendedCallbackQuery):
|
||||
data_str = event.data.decode("utf-8")
|
||||
data_id = data_str.split("*")[0]
|
||||
data_data = data_str.removeprefix(data_id + "*")
|
||||
@ -283,7 +290,7 @@ class CommandHandler():
|
||||
|
||||
# returns True if the command can be used, False if not, and an optional error string together in a tuple
|
||||
# for normal commands, this will be passed to event.reply; for callback queries this will call event.answer
|
||||
async def check_privs(self, event, command, chat_db = None, callback_query = False) -> tuple[bool, str|None]:
|
||||
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None, callback_query = False) -> tuple[bool, str|None]:
|
||||
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
|
||||
return (False, None)
|
||||
|
||||
@ -301,10 +308,10 @@ class CommandHandler():
|
||||
if event.is_private or not (await event.client.get_permissions(event.chat, event.sender_id)).is_admin and not self.is_sudo(event) and not self.is_owner(event):
|
||||
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
|
||||
|
||||
if not callback_query and event.chat and command.nsfw and not chat_db.nsfw_enabled:
|
||||
if not callback_query and event.chat and command.nsfw and (chat_db and not chat_db.nsfw_enabled):
|
||||
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
|
||||
|
||||
if not callback_query and event.chat and command.fun and not chat_db.fun_enabled:
|
||||
if not callback_query and event.chat and command.fun and (chat_db and not chat_db.fun_enabled):
|
||||
return (False, None)
|
||||
|
||||
return (True, None)
|
||||
|
@ -1,10 +1,34 @@
|
||||
from re import Match
|
||||
from typing import Any
|
||||
|
||||
from telethon.events.callbackquery import CallbackQuery
|
||||
from telethon.events.inlinequery import InlineQuery
|
||||
from telethon.events.newmessage import NewMessage
|
||||
from telethon.tl.types import (DocumentAttributeFilename,
|
||||
DocumentAttributeImageSize,
|
||||
DocumentAttributeSticker)
|
||||
DocumentAttributeSticker,
|
||||
MessageEntityMentionName)
|
||||
|
||||
from ubot.command import (CallbackQueryCommand, Command, InlineArticleCommand,
|
||||
InlinePhotoCommand)
|
||||
from ubot.database import ChatWrapper
|
||||
|
||||
|
||||
class ExtendedEvent(NewMessage.Event):
|
||||
class ExtendedNewMessage(NewMessage.Event):
|
||||
pattern_match: Match[str] # pattern match as returned by re.search when it's used in the command handler
|
||||
chat_db: ChatWrapper # database reference for the chat this command was executed in
|
||||
object: Command # the object constructed when the command associated with this event was added
|
||||
command: str # the base command with no prefix, no args and no other_args; the whole pattern if raw_pattern is used
|
||||
prefix: str # prefix used to call this command, such as "/" or "g."; not set if simple_pattern/raw_pattern is used
|
||||
extra: Any # any object you set to extra when registering the command associated with this event
|
||||
args: str # anything after the command itself and any groups caught in other_args, such as booru tags
|
||||
other_args: tuple # any groups between the args group and the command itself
|
||||
nsfw_disabled: bool # only set if pass_nsfw is True; this value is the opposite of nsfw_enabled in chat_db
|
||||
|
||||
@property
|
||||
def has_user_entities(self) -> bool:
|
||||
return any([i for i in self.entities if isinstance(i, MessageEntityMentionName)]) if self.entities else False
|
||||
|
||||
async def get_text(self, return_msg=False, default=""):
|
||||
if self.args:
|
||||
if return_msg:
|
||||
@ -58,3 +82,23 @@ class ExtendedEvent(NewMessage.Event):
|
||||
return await self.message.respond(*args, **kwargs|{"reply_to": self.reply_to.reply_to_msg_id})
|
||||
|
||||
return await self.message.respond(*args, **kwargs)
|
||||
|
||||
|
||||
class ExtendedCallbackQuery(CallbackQuery.Event):
|
||||
chat_db: ChatWrapper|None
|
||||
object: CallbackQueryCommand
|
||||
command: str
|
||||
extra: Any
|
||||
args: str
|
||||
|
||||
|
||||
class ExtendedInlineQuery(InlineQuery.Event):
|
||||
pattern_match: Match[str]
|
||||
parse_mode: str
|
||||
object: InlineArticleCommand|InlinePhotoCommand
|
||||
command: str
|
||||
extra: Any
|
||||
args: str
|
||||
other_args: tuple
|
||||
nsfw_disabled: bool
|
||||
link_preview: bool
|
||||
|
@ -4,7 +4,7 @@ from telethon.tl.types import Channel, Chat, MessageEntityMentionName
|
||||
|
||||
|
||||
async def get_user(event, allow_channel=False):
|
||||
if mention_entities := [i for i in event.get_entities_text() if isinstance(i, MessageEntityMentionName)]:
|
||||
if mention_entities := [i for i in event.entities if isinstance(i, MessageEntityMentionName)] if event.entities else False:
|
||||
if len(mention_entities) > 1:
|
||||
await event.reply("You provided too many arguments!")
|
||||
return
|
||||
|
@ -160,6 +160,11 @@ async def dbstat(event):
|
||||
)
|
||||
|
||||
|
||||
@ldr.add("usage", owner=True, hide_help=True)
|
||||
async def usagestat(event):
|
||||
await event.reply("\n".join(f"{command.pattern}: {command.uses}" for command in ldr.command_handler.incoming_commands if command.uses))
|
||||
|
||||
|
||||
@ldr.add("shutdown", pattern_extra="(f|)", owner=True, hide_help=True)
|
||||
async def shutdown(event):
|
||||
await event.reply("Goodbye…")
|
||||
|
@ -18,7 +18,7 @@ async def kick_user(event):
|
||||
await event.reply("I can't kick users in this chat.")
|
||||
return
|
||||
|
||||
if time_regex.sub("", event.args).strip().lower() == "me":
|
||||
if time_regex.sub("", event.args).strip().lower() == "me" and not event.has_user_entities:
|
||||
self_harm = True
|
||||
user_to_kick = await event.get_sender()
|
||||
else:
|
||||
@ -63,7 +63,7 @@ async def ban_user(event):
|
||||
if time_match := time_regex.search(event.args):
|
||||
event.args = time_regex.sub("", event.args).strip()
|
||||
|
||||
if time_regex.sub("", event.args).strip().lower() == "me":
|
||||
if time_regex.sub("", event.args).strip().lower() == "me" and not event.has_user_entities:
|
||||
await event.reply("I don't think I should do that…")
|
||||
return
|
||||
|
||||
@ -105,6 +105,10 @@ async def ban_user(event):
|
||||
@ldr.add("unban", moderation=True, help="Unban a user.")
|
||||
@ldr.add(f"{bot_name}(,|) unban", moderation=True, simple_pattern=True, hide_help=True)
|
||||
async def unban_user(event):
|
||||
if event.args.lower() == "me" and not event.has_user_entities:
|
||||
await event.reply("You probably aren't banned.")
|
||||
return
|
||||
|
||||
if not (await event.client.get_permissions(event.chat, "me")).ban_users:
|
||||
await event.reply("I can't unban users in this chat.")
|
||||
return
|
||||
@ -135,7 +139,7 @@ async def mute_user(event):
|
||||
if time_match := time_regex.search(event.args):
|
||||
event.args = time_regex.sub("", event.args).strip()
|
||||
|
||||
if time_regex.sub("", event.args).strip().lower() == "me":
|
||||
if time_regex.sub("", event.args).strip().lower() == "me" and not event.has_user_entities:
|
||||
self_harm = True
|
||||
user_to_mute = await event.get_sender()
|
||||
else:
|
||||
@ -186,7 +190,7 @@ async def mute_user(event):
|
||||
@ldr.add("unmute", moderation=True, help="Unmute a user.")
|
||||
@ldr.add(f"{bot_name}(,|) unmute", moderation=True, simple_pattern=True, hide_help=True)
|
||||
async def unmute_user(event):
|
||||
if event.args.lower() == "me":
|
||||
if event.args.lower() == "me" and not event.has_user_entities:
|
||||
await event.reply("You probably aren't muted.")
|
||||
return
|
||||
|
||||
|
@ -106,7 +106,7 @@ async def bot_repo(event):
|
||||
await event.reply("https://github.com/Nick80835/microbot/tree/bot")
|
||||
|
||||
|
||||
@ldr.add("disable", admin=True, help="Disables commands in the current chat, requires admin.")
|
||||
@ldr.add("disable", admin=True, no_private=True, help="Disables commands in the current chat, requires admin.")
|
||||
async def disable_command(event):
|
||||
if event.args:
|
||||
for command in ldr.command_handler.incoming_commands:
|
||||
@ -124,7 +124,7 @@ async def disable_command(event):
|
||||
await event.reply("Specify a command to disable!")
|
||||
|
||||
|
||||
@ldr.add("enable", admin=True, help="Enables commands in the current chat, requires admin.")
|
||||
@ldr.add("enable", admin=True, no_private=True, help="Enables commands in the current chat, requires admin.")
|
||||
async def enable_command(event):
|
||||
if event.args:
|
||||
for command in ldr.command_handler.incoming_commands:
|
||||
@ -138,7 +138,7 @@ async def enable_command(event):
|
||||
await event.reply("Specify a command to enable!")
|
||||
|
||||
|
||||
@ldr.add("showdisabled", admin=True, help="Shows disabled commands in the current chat.")
|
||||
@ldr.add("showdisabled", admin=True, no_private=True, help="Shows disabled commands in the current chat.")
|
||||
async def show_disabled(event):
|
||||
disabled_list = event.chat_db.disabled_commands
|
||||
|
||||
@ -149,7 +149,7 @@ async def show_disabled(event):
|
||||
await event.reply(f"There are no disabled commands in **{event.chat.id}**!")
|
||||
|
||||
|
||||
@ldr.add("nsfw", admin=True, help="Enables or disables NSFW commands for a chat, requires admin.")
|
||||
@ldr.add("nsfw", admin=True, no_private=True, help="Enables or disables NSFW commands for a chat, requires admin.")
|
||||
async def nsfw_toggle(event):
|
||||
if event.args.lower() not in ("on", "off"):
|
||||
if event.chat_db.nsfw_enabled:
|
||||
@ -168,7 +168,7 @@ async def nsfw_toggle(event):
|
||||
await event.reply("NSFW commands disabled for this chat!")
|
||||
|
||||
|
||||
@ldr.add("spoilernsfw", admin=True, help="Enables or disables spoilering NSFW media for a chat, requires admin.")
|
||||
@ldr.add("spoilernsfw", admin=True, no_private=True, help="Enables or disables spoilering NSFW media for a chat, requires admin.")
|
||||
async def spoiler_nsfw_toggle(event):
|
||||
if event.args.lower() not in ("on", "off"):
|
||||
if event.chat_db.spoiler_nsfw:
|
||||
@ -187,7 +187,7 @@ async def spoiler_nsfw_toggle(event):
|
||||
await event.reply("NSFW spoilers disabled for this chat!")
|
||||
|
||||
|
||||
@ldr.add("fun", admin=True, help="Enables or disables fun commands for a chat, requires admin.")
|
||||
@ldr.add("fun", admin=True, no_private=True, help="Enables or disables fun commands for a chat, requires admin.")
|
||||
async def fun_toggle(event):
|
||||
if event.args.lower() not in ("on", "off"):
|
||||
if event.chat_db.fun_enabled:
|
||||
@ -206,7 +206,7 @@ async def fun_toggle(event):
|
||||
await event.reply("Fun commands disabled for this chat!")
|
||||
|
||||
|
||||
@ldr.add("modmode", sudo=True, admin=True, help="Enables or disables moderation commands for a chat, requires admin.")
|
||||
@ldr.add("modmode", sudo=True, admin=True, no_private=True, help="Enables or disables moderation commands for a chat, requires admin.")
|
||||
async def modmode_toggle(event):
|
||||
if event.args.lower() not in ("on", "off"):
|
||||
if event.chat_db.modmode_enabled:
|
||||
|
Loading…
x
Reference in New Issue
Block a user