2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-22 01:58:17 +00:00

Compare commits

...

5 Commits

Author SHA1 Message Date
Nick80835
2b84d937ac rework permissions and command values a bit 2023-10-22 19:43:46 -04:00
Nick80835
b20c67d220 fix chance commands 2023-10-22 19:04:15 -04:00
Nick80835
6443a54d97 remove the last while True 2023-10-22 18:58:23 -04:00
Nick80835
f0e257c59b expose client in init alongside ldr 2023-10-22 18:51:36 -04:00
Nick80835
073c8ae47a remove extra get_me that I forgot to remove 2023-10-22 18:19:29 -04:00
4 changed files with 36 additions and 38 deletions

View File

@ -25,6 +25,9 @@ logger = getLogger(__name__)
startup_time = time()
loop = asyncio.get_event_loop()
client: TelegramClient
ldr: Loader
class MicroBot():
settings = Settings()
@ -36,6 +39,7 @@ class MicroBot():
loop.run_until_complete(self._initialize_bot())
async def _initialize_bot(self):
global client
global ldr
try:
@ -47,14 +51,13 @@ class MicroBot():
).start(
bot_token=self.settings.get_config("bot_token")
)
self.me = await self.client.get_me()
except (TokenInvalidError, AccessTokenExpiredError, AccessTokenInvalidError):
logger.error("The bot token provided is invalid, exiting.")
sys.exit(1)
self.me = await self.client.get_me()
self.loader = Loader(self)
client = self.client
ldr = self.loader
self.loader.load_all_modules()
logger.info("Bot successfully started.")

View File

@ -2,11 +2,15 @@ from types import FunctionType
class Command:
mod_cooldown_chats = []
locked_users = []
lock_reason: str|None = None
data = {}
uses = 0
def __init__(self, func: FunctionType, args: dict):
self.module = func.__module__.split(".")[-1]
self.function = func
self.data = {}
self.uses = 0
self.pattern = args.get("pattern")
self.simple_pattern = args.get("simple_pattern", False)
@ -16,7 +20,6 @@ class Command:
self.help = args.get("help", None) or func.__doc__
self.hide_help = args.get("hide_help", False)
self.moderation = args.get("moderation", False)
self.mod_cooldown_chats = []
self.owner = args.get("owner", False)
self.sudo = args.get("sudo", False)
self.admin = args.get("admin", False)
@ -24,9 +27,7 @@ class Command:
self.nsfw_warning = args.get("nsfw_warning", None)
self.pass_nsfw = args.get("pass_nsfw", False)
self.locking = args.get("locking", False)
self.lock_reason = None
self.user_locking = args.get("userlocking", False)
self.locked_users = []
self.chance = args.get("chance", None)
self.fun = args.get("fun", False)
self.not_disableable = args.get("no_disable", False) or self.owner or self.sudo or self.admin
@ -61,10 +62,11 @@ class InlineArticleCommand:
class CallbackQueryCommand:
data = {}
def __init__(self, func: FunctionType, args: dict):
self.module = func.__module__.split(".")[-1]
self.function = func
self.data = {}
self.data_id = args.get("data_id")
self.extra = args.get("extra", None)

View File

@ -113,7 +113,7 @@ class CommandHandler():
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
if pattern_match:
if self.is_blacklisted(event, True) and not self.is_owner(event) and not self.is_sudo(event):
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return
await self.handle_inline_photo(event, pattern_match, command)
@ -123,7 +123,7 @@ class CommandHandler():
pattern_match = search(SIMPLE_PATTERN_TEMPLATE.format(command.pattern + command.pattern_extra), event.text, IGNORECASE|DOTALL)
if pattern_match:
if self.is_blacklisted(event, True) and not self.is_owner(event) and not self.is_sudo(event):
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return
await self.handle_inline_article(event, pattern_match, command)
@ -228,7 +228,7 @@ class CommandHandler():
if not event.via_inline:
event.chat_db = self.db.get_chat((await event.get_chat()).id)
if not (priv_resp := await self.check_privs(event, command, event.chat_db, True))[0]:
if not (priv_resp := await self.check_privs(event, command, event.chat_db))[0]:
await event.answer(priv_resp[1])
continue
@ -252,7 +252,7 @@ class CommandHandler():
except:
return
async def execute_command(self, event, command):
async def execute_command(self, event: ExtendedNewMessage, command: Command):
try:
if command.locking:
if command.lock_reason:
@ -282,7 +282,8 @@ class CommandHandler():
command.mod_cooldown_chats.remove(event.chat.id)
except:
pass
elif command.chance and randint(0, 100) <= command.chance:
elif command.chance:
if randint(0, 100) <= command.chance:
await command.function(event)
else:
await command.function(event)
@ -302,11 +303,19 @@ class CommandHandler():
# returns True if the command can be used, False if not, and an optional error string together in a tuple
# for normal commands, this will be passed to event.reply; for callback queries this will call event.answer
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None, callback_query = False) -> tuple[bool, str|None]:
async def check_privs(self, event, command: Command|CallbackQueryCommand, chat_db: ChatWrapper|None = None) -> tuple[bool, str|None]:
if self.is_blacklisted(event) and not self.is_owner(event) and not self.is_sudo(event):
return (False, None)
if not callback_query and command.no_private and event.is_private:
if isinstance(command, Command):
if event.chat and chat_db:
if command.nsfw and not chat_db.nsfw_enabled:
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
if command.fun and not chat_db.fun_enabled:
return (False, None)
if event.is_private and command.no_private:
return (False, "That command can't be used in private!")
if command.owner and not self.is_owner(event):
@ -320,24 +329,13 @@ class CommandHandler():
if event.is_private or not (await event.client.get_permissions(event.chat, event.sender_id)).is_admin and not self.is_sudo(event) and not self.is_owner(event):
return (False, None if command.silent_bail else "You lack the permissions to use that command!")
if not callback_query and event.chat and command.nsfw and (chat_db and not chat_db.nsfw_enabled):
return (False, None if command.silent_bail else command.nsfw_warning or "NSFW commands are disabled in this chat!")
if not callback_query and event.chat and command.fun and (chat_db and not chat_db.fun_enabled):
return (False, None)
return (True, None)
def is_owner(self, event):
def is_owner(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
return str(event.sender_id) in self.settings.get_list("owner_id")
def is_sudo(self, event):
def is_sudo(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
return event.sender_id in self.db.sudo_users
def is_blacklisted(self, event, inline=False):
if inline:
user_id = event.query.user_id
else:
user_id = event.sender_id
return user_id in self.db.blacklisted_users
def is_blacklisted(self, event: ExtendedNewMessage|ExtendedInlineQuery) -> bool:
return event.query.user_id if isinstance(event, ExtendedInlineQuery) else event.sender_id in self.db.blacklisted_users

View File

@ -33,12 +33,7 @@ TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLoca
def stream_file(file_to_stream: BinaryIO, chunk_size=1024):
while True:
data_read = file_to_stream.read(chunk_size)
if not data_read:
break
while data_read := file_to_stream.read(chunk_size):
yield data_read