2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-22 01:58:17 +00:00

Compare commits

...

5 Commits

Author SHA1 Message Date
Nick80835
0119698e81 improve/fix moderation mentions 2023-10-21 20:35:06 -04:00
Nick80835
4aae840a8c add simple in-memory cmd use stat 2023-10-21 19:58:41 -04:00
Nick80835
ffea3fe581 mark admin commands as no_private 2023-10-21 19:48:07 -04:00
Nick80835
30238f088a clarify prefix doc 2023-10-21 19:40:45 -04:00
Nick80835
c799f34f89 improve type hinting/doc a bit 2023-10-21 19:30:14 -04:00
8 changed files with 94 additions and 28 deletions

View File

@ -4,13 +4,15 @@ from logging import INFO, basicConfig, getLogger
from time import time
import telethon
from telethon import TelegramClient
from telethon.errors.rpcerrorlist import (AccessTokenExpiredError,
AccessTokenInvalidError,
TokenInvalidError)
from telethon.network.connection.tcpabridged import \
ConnectionTcpAbridged as CTA
from .custom import ExtendedEvent
from .custom import (ExtendedCallbackQuery, ExtendedInlineQuery,
ExtendedNewMessage)
from .loader import Loader
from .settings import Settings
@ -26,9 +28,9 @@ loop = asyncio.get_event_loop()
class MicroBot():
settings = Settings()
client = None
logger = None
loader = None
logger = logger
client: TelegramClient
loader: Loader
def __init__(self):
loop.run_until_complete(self._initialize_bot())
@ -37,7 +39,7 @@ class MicroBot():
global ldr
try:
self.client = await telethon.TelegramClient(
self.client = await TelegramClient(
self.settings.get_config("session_name", "bot0") or "bot0",
self.settings.get_config("api_id"),
self.settings.get_config("api_hash"),
@ -76,7 +78,10 @@ class MicroBot():
sys.exit(0)
telethon.events.NewMessage.Event = ExtendedEvent
telethon.events.NewMessage.Event = ExtendedNewMessage
telethon.events.CallbackQuery.Event = ExtendedCallbackQuery
telethon.events.InlineQuery.Event = ExtendedInlineQuery
micro_bot = MicroBot()
try:

View File

@ -6,6 +6,7 @@ class Command:
self.module = func.__module__.split(".")[-1]
self.function = func
self.data = {}
self.uses = 0
self.pattern = args.get("pattern")
self.simple_pattern = args.get("simple_pattern", False)

View File

@ -8,6 +8,11 @@ from telethon import events
from telethon.errors.rpcerrorlist import (ChatAdminRequiredError,
ChatWriteForbiddenError)
from ubot.command import CallbackQueryCommand, Command
from ubot.custom import (ExtendedCallbackQuery, ExtendedInlineQuery,
ExtendedNewMessage)
from ubot.database import ChatWrapper
from .fixes import inline_photos
@ -39,7 +44,7 @@ class CommandHandler():
if not isinstance(exception, (ChatAdminRequiredError, ChatWriteForbiddenError)):
await event.client.send_message(int(self.settings.get_list("owner_id")[0]), str(format_exc()))
async def handle_incoming(self, event):
async def handle_incoming(self, event: ExtendedNewMessage):
chat_db = self.db.get_chat((await event.get_chat()).id)
chat_prefix = chat_db.prefix
@ -57,6 +62,8 @@ class CommandHandler():
pattern_match = search(self.pattern_template.format(f"({'|'.join([escape(i) for i in prefix_list])})", command.pattern + command.pattern_extra, self.micro_bot.me.username), event.raw_text, IGNORECASE|DOTALL)
if pattern_match:
command.uses += 1
if command.moderation and not chat_db.modmode_enabled:
continue
@ -100,7 +107,7 @@ class CommandHandler():
await self.execute_command(event, command)
async def handle_inline(self, event):
async def handle_inline(self, event: ExtendedInlineQuery):
for command in self.inline_photo_commands:
pattern_match = search(self.simple_pattern_template.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
@ -123,7 +130,7 @@ class CommandHandler():
await self.fallback_inline(event)
async def handle_inline_photo(self, event, pattern_match, command):
async def handle_inline_photo(self, event: ExtendedInlineQuery, pattern_match, command):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
@ -166,7 +173,7 @@ class CommandHandler():
except:
print_exc()
async def handle_inline_article(self, event, pattern_match, command):
async def handle_inline_article(self, event: ExtendedInlineQuery, pattern_match, command):
builder = event.builder
event.pattern_match = pattern_match
event.args = pattern_match.groups()[-1]
@ -204,7 +211,7 @@ class CommandHandler():
except:
print_exc()
async def handle_callback_query(self, event):
async def handle_callback_query(self, event: ExtendedCallbackQuery):
data_str = event.data.decode("utf-8")
data_id = data_str.split("*")[0]
data_data = data_str.removeprefix(data_id + "*")
@ -283,7 +290,7 @@ class CommandHandler():
# returns True if the command can be used, False if not, and an optional error string together in a tuple
# for normal commands, this will be passed to event.reply; for callback queries this will call event.answer
async def check_privs(self, event, command, chat_db = None, callback_query = False) -> tuple[bool, str|None]:
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None, callback_query = False) -> tuple[bool, str|None]:
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return (False, None)
@ -301,10 +308,10 @@ class CommandHandler():
if event.is_private or not (await event.client.get_permissions(event.chat, event.sender_id)).is_admin and not self.is_sudo(event) and not self.is_owner(event):
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
if not callback_query and event.chat and command.nsfw and not chat_db.nsfw_enabled:
if not callback_query and event.chat and command.nsfw and (chat_db and not chat_db.nsfw_enabled):
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
if not callback_query and event.chat and command.fun and not chat_db.fun_enabled:
if not callback_query and event.chat and command.fun and (chat_db and not chat_db.fun_enabled):
return (False, None)
return (True, None)

View File

@ -1,10 +1,34 @@
from re import Match
from typing import Any
from telethon.events.callbackquery import CallbackQuery
from telethon.events.inlinequery import InlineQuery
from telethon.events.newmessage import NewMessage
from telethon.tl.types import (DocumentAttributeFilename,
DocumentAttributeImageSize,
DocumentAttributeSticker)
DocumentAttributeSticker,
MessageEntityMentionName)
from ubot.command import (CallbackQueryCommand, Command, InlineArticleCommand,
InlinePhotoCommand)
from ubot.database import ChatWrapper
class ExtendedEvent(NewMessage.Event):
class ExtendedNewMessage(NewMessage.Event):
pattern_match: Match[str] # pattern match as returned by re.search when it's used in the command handler
chat_db: ChatWrapper # database reference for the chat this command was executed in
object: Command # the object constructed when the command associated with this event was added
command: str # the base command with no prefix, no args and no other_args; the whole pattern if raw_pattern is used
prefix: str # prefix used to call this command, such as "/" or "g."; not set if simple_pattern/raw_pattern is used
extra: Any # any object you set to extra when registering the command associated with this event
args: str # anything after the command itself and any groups caught in other_args, such as booru tags
other_args: tuple # any groups between the args group and the command itself
nsfw_disabled: bool # only set if pass_nsfw is True; this value is the opposite of nsfw_enabled in chat_db
@property
def has_user_entities(self) -> bool:
return any([i for i in self.entities if isinstance(i, MessageEntityMentionName)]) if self.entities else False
async def get_text(self, return_msg=False, default=""):
if self.args:
if return_msg:
@ -58,3 +82,23 @@ class ExtendedEvent(NewMessage.Event):
return await self.message.respond(*args, **kwargs|{"reply_to": self.reply_to.reply_to_msg_id})
return await self.message.respond(*args, **kwargs)
class ExtendedCallbackQuery(CallbackQuery.Event):
chat_db: ChatWrapper|None
object: CallbackQueryCommand
command: str
extra: Any
args: str
class ExtendedInlineQuery(InlineQuery.Event):
pattern_match: Match[str]
parse_mode: str
object: InlineArticleCommand|InlinePhotoCommand
command: str
extra: Any
args: str
other_args: tuple
nsfw_disabled: bool
link_preview: bool

View File

@ -4,7 +4,7 @@ from telethon.tl.types import Channel, Chat, MessageEntityMentionName
async def get_user(event, allow_channel=False):
if mention_entities := [i for i in event.get_entities_text() if isinstance(i, MessageEntityMentionName)]:
if mention_entities := [i for i in event.entities if isinstance(i, MessageEntityMentionName)] if event.entities else False:
if len(mention_entities) > 1:
await event.reply("You provided too many arguments!")
return

View File

@ -160,6 +160,11 @@ async def dbstat(event):
)
@ldr.add("usage", owner=True, hide_help=True)
async def usagestat(event):
await event.reply("\n".join(f"{command.pattern}: {command.uses}" for command in ldr.command_handler.incoming_commands if command.uses))
@ldr.add("shutdown", pattern_extra="(f|)", owner=True, hide_help=True)
async def shutdown(event):
await event.reply("Goodbye…")

View File

@ -18,7 +18,7 @@ async def kick_user(event):
await event.reply("I can't kick users in this chat.")
return
if time_regex.sub("", event.args).strip().lower() == "me":
if time_regex.sub("", event.args).strip().lower() == "me" and not event.has_user_entities:
self_harm = True
user_to_kick = await event.get_sender()
else:
@ -63,7 +63,7 @@ async def ban_user(event):
if time_match := time_regex.search(event.args):
event.args = time_regex.sub("", event.args).strip()
if time_regex.sub("", event.args).strip().lower() == "me":
if time_regex.sub("", event.args).strip().lower() == "me" and not event.has_user_entities:
await event.reply("I don't think I should do that…")
return
@ -105,6 +105,10 @@ async def ban_user(event):
@ldr.add("unban", moderation=True, help="Unban a user.")
@ldr.add(f"{bot_name}(,|) unban", moderation=True, simple_pattern=True, hide_help=True)
async def unban_user(event):
if event.args.lower() == "me" and not event.has_user_entities:
await event.reply("You probably aren't banned.")
return
if not (await event.client.get_permissions(event.chat, "me")).ban_users:
await event.reply("I can't unban users in this chat.")
return
@ -135,7 +139,7 @@ async def mute_user(event):
if time_match := time_regex.search(event.args):
event.args = time_regex.sub("", event.args).strip()
if time_regex.sub("", event.args).strip().lower() == "me":
if time_regex.sub("", event.args).strip().lower() == "me" and not event.has_user_entities:
self_harm = True
user_to_mute = await event.get_sender()
else:
@ -186,7 +190,7 @@ async def mute_user(event):
@ldr.add("unmute", moderation=True, help="Unmute a user.")
@ldr.add(f"{bot_name}(,|) unmute", moderation=True, simple_pattern=True, hide_help=True)
async def unmute_user(event):
if event.args.lower() == "me":
if event.args.lower() == "me" and not event.has_user_entities:
await event.reply("You probably aren't muted.")
return

View File

@ -106,7 +106,7 @@ async def bot_repo(event):
await event.reply("https://github.com/Nick80835/microbot/tree/bot")
@ldr.add("disable", admin=True, help="Disables commands in the current chat, requires admin.")
@ldr.add("disable", admin=True, no_private=True, help="Disables commands in the current chat, requires admin.")
async def disable_command(event):
if event.args:
for command in ldr.command_handler.incoming_commands:
@ -124,7 +124,7 @@ async def disable_command(event):
await event.reply("Specify a command to disable!")
@ldr.add("enable", admin=True, help="Enables commands in the current chat, requires admin.")
@ldr.add("enable", admin=True, no_private=True, help="Enables commands in the current chat, requires admin.")
async def enable_command(event):
if event.args:
for command in ldr.command_handler.incoming_commands:
@ -138,7 +138,7 @@ async def enable_command(event):
await event.reply("Specify a command to enable!")
@ldr.add("showdisabled", admin=True, help="Shows disabled commands in the current chat.")
@ldr.add("showdisabled", admin=True, no_private=True, help="Shows disabled commands in the current chat.")
async def show_disabled(event):
disabled_list = event.chat_db.disabled_commands
@ -149,7 +149,7 @@ async def show_disabled(event):
await event.reply(f"There are no disabled commands in **{event.chat.id}**!")
@ldr.add("nsfw", admin=True, help="Enables or disables NSFW commands for a chat, requires admin.")
@ldr.add("nsfw", admin=True, no_private=True, help="Enables or disables NSFW commands for a chat, requires admin.")
async def nsfw_toggle(event):
if event.args.lower() not in ("on", "off"):
if event.chat_db.nsfw_enabled:
@ -168,7 +168,7 @@ async def nsfw_toggle(event):
await event.reply("NSFW commands disabled for this chat!")
@ldr.add("spoilernsfw", admin=True, help="Enables or disables spoilering NSFW media for a chat, requires admin.")
@ldr.add("spoilernsfw", admin=True, no_private=True, help="Enables or disables spoilering NSFW media for a chat, requires admin.")
async def spoiler_nsfw_toggle(event):
if event.args.lower() not in ("on", "off"):
if event.chat_db.spoiler_nsfw:
@ -187,7 +187,7 @@ async def spoiler_nsfw_toggle(event):
await event.reply("NSFW spoilers disabled for this chat!")
@ldr.add("fun", admin=True, help="Enables or disables fun commands for a chat, requires admin.")
@ldr.add("fun", admin=True, no_private=True, help="Enables or disables fun commands for a chat, requires admin.")
async def fun_toggle(event):
if event.args.lower() not in ("on", "off"):
if event.chat_db.fun_enabled:
@ -206,7 +206,7 @@ async def fun_toggle(event):
await event.reply("Fun commands disabled for this chat!")
@ldr.add("modmode", sudo=True, admin=True, help="Enables or disables moderation commands for a chat, requires admin.")
@ldr.add("modmode", sudo=True, admin=True, no_private=True, help="Enables or disables moderation commands for a chat, requires admin.")
async def modmode_toggle(event):
if event.args.lower() not in ("on", "off"):
if event.chat_db.modmode_enabled: