mirror of
https://github.com/Nick80835/microbot
synced 2025-09-01 06:55:51 +00:00
stuff and things
This commit is contained in:
@@ -10,16 +10,17 @@ from .fixes import inline_photos
|
|||||||
|
|
||||||
|
|
||||||
class CommandHandler():
|
class CommandHandler():
|
||||||
def __init__(self, client, logger, settings, loader):
|
pattern_template = "(?is)^{0}({1})(?: |$|_|@{2}(?: |$|_))(.*)"
|
||||||
|
inline_pattern_template = "(?is)^({0})(?: |$|_)(.*)"
|
||||||
|
raw_pattern_template = "(?is){0}"
|
||||||
|
|
||||||
|
incoming_commands = []
|
||||||
|
inline_photo_commands = []
|
||||||
|
inline_article_commands = []
|
||||||
|
callback_queries = []
|
||||||
|
|
||||||
|
def __init__(self, client, settings, loader):
|
||||||
self.username = client.loop.run_until_complete(client.get_me()).username
|
self.username = client.loop.run_until_complete(client.get_me()).username
|
||||||
self.pattern_template = "(?is)^{0}({1})(?: |$|_|@{2}(?: |$|_))(.*)"
|
|
||||||
self.inline_pattern_template = "(?is)^({0})(?: |$|_)(.*)"
|
|
||||||
self.raw_pattern_template = "(?is){0}"
|
|
||||||
self.incoming_commands = []
|
|
||||||
self.inline_photo_commands = []
|
|
||||||
self.inline_article_commands = []
|
|
||||||
self.callback_queries = []
|
|
||||||
self.logger = logger
|
|
||||||
self.settings = settings
|
self.settings = settings
|
||||||
self.loader = loader
|
self.loader = loader
|
||||||
client.add_event_handler(self.handle_incoming, events.NewMessage(incoming=True))
|
client.add_event_handler(self.handle_incoming, events.NewMessage(incoming=True))
|
||||||
@@ -42,20 +43,10 @@ class CommandHandler():
|
|||||||
print(f"Attempted command ({event.raw_text}) from blacklisted ID {event.from_id}")
|
print(f"Attempted command ({event.raw_text}) from blacklisted ID {event.from_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
if value["owner"] and not self.is_owner(event):
|
if not await self.check_privs(event, value):
|
||||||
print(f"Attempted owner command ({event.raw_text}) from ID {event.from_id}")
|
return
|
||||||
continue
|
|
||||||
elif value["sudo"] and not self.is_sudo(event) and not self.is_owner(event):
|
|
||||||
print(f"Attempted sudo command ({event.raw_text}) from ID {event.from_id}")
|
|
||||||
continue
|
|
||||||
elif value["admin"] and not await self.is_admin(event) and not self.is_sudo(event) and not self.is_owner(event):
|
|
||||||
print(f"Attempted admin command ({event.raw_text}) from ID {event.from_id}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
if value["nsfw"] and str(event.chat.id) in self.settings.get_list("nsfw_blacklist"):
|
if value["pass_nsfw"]:
|
||||||
print(f"Attempted NSFW command ({event.raw_text}) in blacklisted chat ({event.chat.id}) from ID {event.from_id}")
|
|
||||||
continue
|
|
||||||
elif value["pass_nsfw"]:
|
|
||||||
event.nsfw_disabled = str(event.chat.id) in self.settings.get_list("nsfw_blacklist")
|
event.nsfw_disabled = str(event.chat.id) in self.settings.get_list("nsfw_blacklist")
|
||||||
|
|
||||||
event.pattern_match = pattern_match
|
event.pattern_match = pattern_match
|
||||||
@@ -64,50 +55,7 @@ class CommandHandler():
|
|||||||
event.command = pattern_match.groups()[1]
|
event.command = pattern_match.groups()[1]
|
||||||
event.extras = value["extras"]
|
event.extras = value["extras"]
|
||||||
|
|
||||||
try:
|
await self.execute_command(event, value)
|
||||||
if value["locking"]:
|
|
||||||
if value["lockreason"]:
|
|
||||||
await event.reply(f"That command is currently locked: {value['lockreason']}")
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
if value["chance"]:
|
|
||||||
if randint(0, 100) <= value["chance"]:
|
|
||||||
value["lockreason"] = f"In use by **{event.from_id}** (`{event.raw_text}`)"
|
|
||||||
await value["function"](event)
|
|
||||||
value["lockreason"] = None
|
|
||||||
else:
|
|
||||||
value["lockreason"] = f"In use by **{event.from_id}** (`{event.raw_text}`)"
|
|
||||||
await value["function"](event)
|
|
||||||
value["lockreason"] = None
|
|
||||||
elif value["userlocking"]:
|
|
||||||
if event.from_id in value["lockedusers"]:
|
|
||||||
await event.reply(f"Please don't spam that command.")
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
if value["chance"]:
|
|
||||||
if randint(0, 100) <= value["chance"]:
|
|
||||||
value["lockedusers"].append(event.from_id)
|
|
||||||
await value["function"](event)
|
|
||||||
value["lockedusers"].remove(event.from_id)
|
|
||||||
else:
|
|
||||||
value["lockedusers"].append(event.from_id)
|
|
||||||
await value["function"](event)
|
|
||||||
value["lockedusers"].remove(event.from_id)
|
|
||||||
else:
|
|
||||||
if value["chance"]:
|
|
||||||
if randint(0, 100) <= value["chance"]:
|
|
||||||
await value["function"](event)
|
|
||||||
else:
|
|
||||||
await value["function"](event)
|
|
||||||
except Exception as exception:
|
|
||||||
value["lockreason"] = None
|
|
||||||
|
|
||||||
if event.from_id in value["lockedusers"]:
|
|
||||||
value["lockedusers"].remove(event.from_id)
|
|
||||||
|
|
||||||
self.logger.warn(f"{value['function'].__name__} - {exception}")
|
|
||||||
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
|
|
||||||
raise exception
|
|
||||||
|
|
||||||
async def handle_inline(self, event):
|
async def handle_inline(self, event):
|
||||||
for value in self.inline_photo_commands:
|
for value in self.inline_photo_commands:
|
||||||
@@ -204,7 +152,6 @@ class CommandHandler():
|
|||||||
try:
|
try:
|
||||||
await value["function"](event)
|
await value["function"](event)
|
||||||
except Exception as exception:
|
except Exception as exception:
|
||||||
self.logger.warn(f"{value['function'].__name__} - {exception}")
|
|
||||||
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
|
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
|
||||||
raise exception
|
raise exception
|
||||||
|
|
||||||
@@ -222,25 +169,66 @@ class CommandHandler():
|
|||||||
except:
|
except:
|
||||||
return
|
return
|
||||||
|
|
||||||
def is_owner(self, event):
|
async def execute_command(self, event, value):
|
||||||
if str(event.from_id) in self.settings.get_list("owner_id"):
|
try:
|
||||||
return True
|
if value["locking"]:
|
||||||
else:
|
if value["lockreason"]:
|
||||||
|
await event.reply(f"That command is currently locked: {value['lockreason']}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if value["chance"] and randint(0, 100) <= value["chance"] or not value["chance"]:
|
||||||
|
value["lockreason"] = f"In use by **{event.from_id}** (`{event.raw_text}`)"
|
||||||
|
await value["function"](event)
|
||||||
|
value["lockreason"] = None
|
||||||
|
elif value["userlocking"]:
|
||||||
|
if event.from_id in value["lockedusers"]:
|
||||||
|
await event.reply(f"Please don't spam that command.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if value["chance"] and randint(0, 100) <= value["chance"] or not value["chance"]:
|
||||||
|
value["lockedusers"].append(event.from_id)
|
||||||
|
await value["function"](event)
|
||||||
|
value["lockedusers"].remove(event.from_id)
|
||||||
|
else:
|
||||||
|
if value["chance"] and randint(0, 100) <= value["chance"] or not value["chance"]:
|
||||||
|
await value["function"](event)
|
||||||
|
except Exception as exception:
|
||||||
|
value["lockreason"] = None
|
||||||
|
|
||||||
|
if event.from_id in value["lockedusers"]:
|
||||||
|
value["lockedusers"].remove(event.from_id)
|
||||||
|
|
||||||
|
await event.reply(f"`An error occurred in {value['function'].__name__}: {exception}`")
|
||||||
|
raise exception
|
||||||
|
|
||||||
|
async def check_privs(self, event, value):
|
||||||
|
if value["owner"] and not self.is_owner(event):
|
||||||
|
print(f"Attempted owner command ({event.raw_text}) from ID {event.from_id}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def is_sudo(self, event):
|
if value["sudo"] and not self.is_sudo(event) and not self.is_owner(event):
|
||||||
if str(event.from_id) in self.settings.get_list("sudo_users"):
|
print(f"Attempted sudo command ({event.raw_text}) from ID {event.from_id}")
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
if value["admin"] and not await self.is_admin(event) and not self.is_sudo(event) and not self.is_owner(event):
|
||||||
|
print(f"Attempted admin command ({event.raw_text}) from ID {event.from_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if value["nsfw"] and str(event.chat.id) in self.settings.get_list("nsfw_blacklist"):
|
||||||
|
print(f"Attempted NSFW command ({event.raw_text}) in blacklisted chat ({event.chat.id}) from ID {event.from_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_owner(self, event):
|
||||||
|
return bool(str(event.from_id) in self.settings.get_list("owner_id"))
|
||||||
|
|
||||||
|
def is_sudo(self, event):
|
||||||
|
return bool(str(event.from_id) in self.settings.get_list("sudo_users"))
|
||||||
|
|
||||||
async def is_admin(self, event):
|
async def is_admin(self, event):
|
||||||
channel_participant = await event.client(functions.channels.GetParticipantRequest(event.chat, event.from_id))
|
channel_participant = await event.client(functions.channels.GetParticipantRequest(event.chat, event.from_id))
|
||||||
|
return bool(isinstance(channel_participant.participant, (types.ChannelParticipantAdmin, types.ChannelParticipantCreator)))
|
||||||
if isinstance(channel_participant.participant, (types.ChannelParticipantAdmin, types.ChannelParticipantCreator)):
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def is_blacklisted(self, event, inline=False):
|
def is_blacklisted(self, event, inline=False):
|
||||||
if inline:
|
if inline:
|
||||||
@@ -248,7 +236,4 @@ class CommandHandler():
|
|||||||
else:
|
else:
|
||||||
user_id = event.from_id
|
user_id = event.from_id
|
||||||
|
|
||||||
if str(user_id) in self.settings.get_list("blacklisted_users"):
|
return bool(str(user_id) in self.settings.get_list("blacklisted_users"))
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
@@ -12,17 +12,19 @@ from .command_handler import CommandHandler
|
|||||||
|
|
||||||
|
|
||||||
class Loader():
|
class Loader():
|
||||||
|
aioclient = ClientSession()
|
||||||
|
thread_pool = ThreadPoolExecutor()
|
||||||
|
|
||||||
|
help_dict = {}
|
||||||
|
help_hidden_dict = {}
|
||||||
|
loaded_modules = []
|
||||||
|
all_modules = []
|
||||||
|
|
||||||
def __init__(self, client, logger, settings):
|
def __init__(self, client, logger, settings):
|
||||||
self.loaded_modules = []
|
|
||||||
self.all_modules = []
|
|
||||||
self.client = client
|
self.client = client
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.settings = settings
|
self.settings = settings
|
||||||
self.command_handler = CommandHandler(client, logger, settings, self)
|
self.command_handler = CommandHandler(client, settings, self)
|
||||||
self.help_dict = {}
|
|
||||||
self.help_hidden_dict = {}
|
|
||||||
self.aioclient = ClientSession()
|
|
||||||
self.thread_pool = ThreadPoolExecutor()
|
|
||||||
|
|
||||||
def load_all_modules(self):
|
def load_all_modules(self):
|
||||||
self._find_all_modules()
|
self._find_all_modules()
|
||||||
|
@@ -17,13 +17,12 @@ if sys.version_info[0] < 3 or sys.version_info[1] < 6:
|
|||||||
|
|
||||||
|
|
||||||
class MicroBot():
|
class MicroBot():
|
||||||
def __init__(self):
|
client = None
|
||||||
self.client = None
|
settings = Settings()
|
||||||
self.settings = Settings()
|
logger = None
|
||||||
self.logger = None
|
loader = None
|
||||||
self.loader = None
|
|
||||||
|
|
||||||
def start_microbot(self):
|
def __init__(self):
|
||||||
self.start_logger()
|
self.start_logger()
|
||||||
self.start_client()
|
self.start_client()
|
||||||
self.start_loader()
|
self.start_loader()
|
||||||
@@ -84,7 +83,6 @@ class MicroBot():
|
|||||||
|
|
||||||
|
|
||||||
micro_bot = MicroBot()
|
micro_bot = MicroBot()
|
||||||
micro_bot.start_microbot()
|
|
||||||
ldr = micro_bot.loader
|
ldr = micro_bot.loader
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@@ -4,9 +4,8 @@ from configparser import SafeConfigParser
|
|||||||
|
|
||||||
|
|
||||||
class Settings():
|
class Settings():
|
||||||
def __init__(self):
|
config = SafeConfigParser()
|
||||||
self.config = SafeConfigParser()
|
config.read("settings.ini")
|
||||||
self.config.read("settings.ini")
|
|
||||||
|
|
||||||
def write_changes(self):
|
def write_changes(self):
|
||||||
with open('settings.ini', 'w') as config_file:
|
with open('settings.ini', 'w') as config_file:
|
||||||
|
Reference in New Issue
Block a user