mirror of
https://github.com/Nick80835/microbot
synced 2025-08-22 01:58:17 +00:00
Compare commits
5 Commits
87f5d7657d
...
2b84d937ac
Author | SHA1 | Date | |
---|---|---|---|
|
2b84d937ac | ||
|
b20c67d220 | ||
|
6443a54d97 | ||
|
f0e257c59b | ||
|
073c8ae47a |
@ -25,6 +25,9 @@ logger = getLogger(__name__)
|
||||
startup_time = time()
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
client: TelegramClient
|
||||
ldr: Loader
|
||||
|
||||
|
||||
class MicroBot():
|
||||
settings = Settings()
|
||||
@ -36,6 +39,7 @@ class MicroBot():
|
||||
loop.run_until_complete(self._initialize_bot())
|
||||
|
||||
async def _initialize_bot(self):
|
||||
global client
|
||||
global ldr
|
||||
|
||||
try:
|
||||
@ -47,14 +51,13 @@ class MicroBot():
|
||||
).start(
|
||||
bot_token=self.settings.get_config("bot_token")
|
||||
)
|
||||
|
||||
self.me = await self.client.get_me()
|
||||
except (TokenInvalidError, AccessTokenExpiredError, AccessTokenInvalidError):
|
||||
logger.error("The bot token provided is invalid, exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
self.me = await self.client.get_me()
|
||||
self.loader = Loader(self)
|
||||
client = self.client
|
||||
ldr = self.loader
|
||||
self.loader.load_all_modules()
|
||||
logger.info("Bot successfully started.")
|
||||
|
@ -2,11 +2,15 @@ from types import FunctionType
|
||||
|
||||
|
||||
class Command:
|
||||
mod_cooldown_chats = []
|
||||
locked_users = []
|
||||
lock_reason: str|None = None
|
||||
data = {}
|
||||
uses = 0
|
||||
|
||||
def __init__(self, func: FunctionType, args: dict):
|
||||
self.module = func.__module__.split(".")[-1]
|
||||
self.function = func
|
||||
self.data = {}
|
||||
self.uses = 0
|
||||
|
||||
self.pattern = args.get("pattern")
|
||||
self.simple_pattern = args.get("simple_pattern", False)
|
||||
@ -16,7 +20,6 @@ class Command:
|
||||
self.help = args.get("help", None) or func.__doc__
|
||||
self.hide_help = args.get("hide_help", False)
|
||||
self.moderation = args.get("moderation", False)
|
||||
self.mod_cooldown_chats = []
|
||||
self.owner = args.get("owner", False)
|
||||
self.sudo = args.get("sudo", False)
|
||||
self.admin = args.get("admin", False)
|
||||
@ -24,9 +27,7 @@ class Command:
|
||||
self.nsfw_warning = args.get("nsfw_warning", None)
|
||||
self.pass_nsfw = args.get("pass_nsfw", False)
|
||||
self.locking = args.get("locking", False)
|
||||
self.lock_reason = None
|
||||
self.user_locking = args.get("userlocking", False)
|
||||
self.locked_users = []
|
||||
self.chance = args.get("chance", None)
|
||||
self.fun = args.get("fun", False)
|
||||
self.not_disableable = args.get("no_disable", False) or self.owner or self.sudo or self.admin
|
||||
@ -61,10 +62,11 @@ class InlineArticleCommand:
|
||||
|
||||
|
||||
class CallbackQueryCommand:
|
||||
data = {}
|
||||
|
||||
def __init__(self, func: FunctionType, args: dict):
|
||||
self.module = func.__module__.split(".")[-1]
|
||||
self.function = func
|
||||
self.data = {}
|
||||
|
||||
self.data_id = args.get("data_id")
|
||||
self.extra = args.get("extra", None)
|
||||
|
@ -113,7 +113,7 @@ class CommandHandler():
|
||||
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
|
||||
|
||||
if pattern_match:
|
||||
if self.is_blacklisted(event, True) and not self.is_owner(event) and not self.is_sudo(event):
|
||||
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
|
||||
return
|
||||
|
||||
await self.handle_inline_photo(event, pattern_match, command)
|
||||
@ -123,7 +123,7 @@ class CommandHandler():
|
||||
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
|
||||
|
||||
if pattern_match:
|
||||
if self.is_blacklisted(event, True) and not self.is_owner(event) and not self.is_sudo(event):
|
||||
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
|
||||
return
|
||||
|
||||
await self.handle_inline_article(event, pattern_match, command)
|
||||
@ -228,7 +228,7 @@ class CommandHandler():
|
||||
if not event.via_inline:
|
||||
event.chat_db = self.db.get_chat((await event.get_chat()).id)
|
||||
|
||||
if not (priv_resp := await self.check_privs(event, command, event.chat_db, True))[0]:
|
||||
if not (priv_resp := await self.check_privs(event, command, event.chat_db))[0]:
|
||||
await event.answer(priv_resp[1])
|
||||
continue
|
||||
|
||||
@ -252,7 +252,7 @@ class CommandHandler():
|
||||
except:
|
||||
return
|
||||
|
||||
async def execute_command(self, event, command):
|
||||
async def execute_command(self, event: ExtendedNewMessage, command: Command):
|
||||
try:
|
||||
if command.locking:
|
||||
if command.lock_reason:
|
||||
@ -282,8 +282,9 @@ class CommandHandler():
|
||||
command.mod_cooldown_chats.remove(event.chat.id)
|
||||
except:
|
||||
pass
|
||||
elif command.chance and randint(0, 100) <= command.chance:
|
||||
await command.function(event)
|
||||
elif command.chance:
|
||||
if randint(0, 100) <= command.chance:
|
||||
await command.function(event)
|
||||
else:
|
||||
await command.function(event)
|
||||
except Exception as exception:
|
||||
@ -302,12 +303,20 @@ class CommandHandler():
|
||||
|
||||
# returns True if the command can be used, False if not, and an optional error string together in a tuple
|
||||
# for normal commands, this will be passed to event.reply; for callback queries this will call event.answer
|
||||
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None, callback_query = False) -> tuple[bool, str|None]:
|
||||
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None) -> tuple[bool, str|None]:
|
||||
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
|
||||
return (False, None)
|
||||
|
||||
if not callback_query and command.no_private and event.is_private:
|
||||
return (False, "That command can't be used in private!")
|
||||
if isinstance(command, Command):
|
||||
if event.chat and chat_db:
|
||||
if command.nsfw and not chat_db.nsfw_enabled:
|
||||
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
|
||||
|
||||
if command.fun and not chat_db.fun_enabled:
|
||||
return (False, None)
|
||||
|
||||
if event.is_private and command.no_private:
|
||||
return (False, "That command can't be used in private!")
|
||||
|
||||
if command.owner and not self.is_owner(event):
|
||||
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
|
||||
@ -320,24 +329,13 @@ class CommandHandler():
|
||||
if event.is_private or not (await event.client.get_permissions(event.chat, event.sender_id)).is_admin and not self.is_sudo(event) and not self.is_owner(event):
|
||||
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
|
||||
|
||||
if not callback_query and event.chat and command.nsfw and (chat_db and not chat_db.nsfw_enabled):
|
||||
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
|
||||
|
||||
if not callback_query and event.chat and command.fun and (chat_db and not chat_db.fun_enabled):
|
||||
return (False, None)
|
||||
|
||||
return (True, None)
|
||||
|
||||
def is_owner(self, event):
|
||||
def is_owner(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
|
||||
return str(event.sender_id) in self.settings.get_list("owner_id")
|
||||
|
||||
def is_sudo(self, event):
|
||||
def is_sudo(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
|
||||
return event.sender_id in self.db.sudo_users
|
||||
|
||||
def is_blacklisted(self, event, inline=False):
|
||||
if inline:
|
||||
user_id = event.query.user_id
|
||||
else:
|
||||
user_id = event.sender_id
|
||||
|
||||
return user_id in self.db.blacklisted_users
|
||||
def is_blacklisted(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
|
||||
return event.query.user_id if isinstance(event, ExtendedInlineQuery) else event.sender_id in self.db.blacklisted_users
|
||||
|
@ -33,12 +33,7 @@ TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLoca
|
||||
|
||||
|
||||
def stream_file(file_to_stream: BinaryIO, chunk_size=1024):
|
||||
while True:
|
||||
data_read = file_to_stream.read(chunk_size)
|
||||
|
||||
if not data_read:
|
||||
break
|
||||
|
||||
while data_read := file_to_stream.read(chunk_size):
|
||||
yield data_read
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user