From 310dc7aaa34d111bc82f92addedb5308c526e518 Mon Sep 17 00:00:00 2001 From: Nick80835 Date: Sat, 13 Feb 2021 12:06:21 -0500 Subject: [PATCH] extend events and move a few functions to them --- ubot/__init__.py | 7 +++-- ubot/custom.py | 49 +++++++++++++++++++++++++++++++++++ ubot/loader.py | 53 -------------------------------------- ubot/modules/deepfry.py | 16 +++--------- ubot/modules/evaluation.py | 46 ++++++++++++--------------------- ubot/modules/memes.py | 16 ++++++------ ubot/modules/scrapers.py | 4 +-- 7 files changed, 84 insertions(+), 107 deletions(-) create mode 100644 ubot/custom.py diff --git a/ubot/__init__.py b/ubot/__init__.py index cf03785..dcaeb8a 100644 --- a/ubot/__init__.py +++ b/ubot/__init__.py @@ -1,13 +1,14 @@ import sys from logging import INFO, basicConfig, getLogger -import telethon as tt +import telethon from telethon.errors.rpcerrorlist import (AccessTokenExpiredError, AccessTokenInvalidError, TokenInvalidError) from telethon.network.connection.tcpabridged import \ ConnectionTcpAbridged as CTA +from .custom import ExtendedEvent from .loader import Loader from .settings import Settings @@ -65,7 +66,7 @@ class MicroBot(): def start_client(self): api_key, api_hash, bot_token = self._check_config() - self.client = tt.TelegramClient(self.settings.get_config("session_name", "bot0"), api_key, api_hash, connection=CTA) + self.client = telethon.TelegramClient(self.settings.get_config("session_name", "bot0"), api_key, api_hash, connection=CTA) try: self.client.start(bot_token=bot_token) @@ -83,6 +84,8 @@ class MicroBot(): await self.client.disconnect() +telethon.events.NewMessage.Event = ExtendedEvent + micro_bot = MicroBot() ldr = micro_bot.loader client = micro_bot.client diff --git a/ubot/custom.py b/ubot/custom.py new file mode 100644 index 0000000..6362548 --- /dev/null +++ b/ubot/custom.py @@ -0,0 +1,49 @@ +from telethon.events.newmessage import NewMessage +from telethon.tl.types import (DocumentAttributeFilename, + DocumentAttributeImageSize, + DocumentAttributeSticker) + + +class ExtendedEvent(NewMessage.Event): + async def get_text(self, return_msg=False, default=""): + if self.args: + if return_msg: + return self.args, await self.get_reply_message() if self.is_reply else self.args, self + + return self.args + + if self.is_reply: + reply = await self.get_reply_message() + return reply.raw_text, reply if return_msg else reply.raw_text + + return default, self if return_msg else default + + async def get_image(self, with_reply=True): + if self and self.media: + if self.photo: + return self.photo + + if self.document: + if DocumentAttributeFilename(file_name='AnimatedSticker.tgs') in self.media.document.attributes: + return + + if self.gif or self.video or self.audio or self.voice: + return + + return self.media.document + + if with_reply and self.is_reply: + return await (await self.get_reply_message()).get_image(with_reply=False) + + async def get_sticker(self): + reply = await self.get_reply_message() + + if reply and reply.sticker: + stick_attr = [i.alt for i in reply.sticker.attributes if isinstance(i, DocumentAttributeSticker)] + size_attr = [i for i in reply.sticker.attributes if isinstance(i, DocumentAttributeImageSize)] + + if stick_attr and stick_attr[0]: + return reply.sticker + + if size_attr and ((size_attr[0].w == 512 and size_attr[0].h <= 512) or (size_attr[0].w <= 512 and size_attr[0].h == 512)): + return reply.sticker diff --git a/ubot/loader.py b/ubot/loader.py index f425e66..145a274 100644 --- a/ubot/loader.py +++ b/ubot/loader.py @@ -5,9 +5,6 @@ from importlib import import_module, reload from os.path import basename, dirname, isfile from aiohttp import ClientSession -from telethon.tl.types import (DocumentAttributeFilename, - DocumentAttributeImageSize, - DocumentAttributeSticker) from .cache import Cache from .command import (CallbackQueryCommand, Command, InlineArticleCommand, @@ -135,56 +132,6 @@ class Loader(): def get_cbs_by_func(self, func) -> list: return [i for i in self.command_handler.callback_queries if i.function == func] - async def get_text(self, event, with_reply=True, return_msg=False, default=None): - if event.args: - if return_msg: - if event.is_reply: - return event.args, await event.get_reply_message() - - return event.args, event - - return event.args - elif event.is_reply and with_reply: - reply = await event.get_reply_message() - - if return_msg: - return reply.raw_text, reply - - return reply.raw_text - else: - if return_msg: - return default, event - - return default - - async def get_image(self, event): - if event and event.media: - if event.photo: - return event.photo - elif event.document: - if DocumentAttributeFilename(file_name='AnimatedSticker.tgs') in event.media.document.attributes: - return - if event.gif or event.video or event.audio or event.voice: - return - - return event.media.document - else: - return - else: - return - - async def is_sticker(self, event) -> bool: - if event and event.sticker: - stick_attr = [i.alt for i in event.sticker.attributes if isinstance(i, DocumentAttributeSticker)] - size_attr = [i for i in event.sticker.attributes if isinstance(i, DocumentAttributeImageSize)] - - if stick_attr and stick_attr[0]: - return True - elif size_attr and ((size_attr[0].w == 512 and size_attr[0].h <= 512) or (size_attr[0].w <= 512 and size_attr[0].h == 512)): - return True - - return False - async def run_async(self, function, *args): return await self.client.loop.run_in_executor(self.thread_pool, partial(function, *args)) diff --git a/ubot/modules/deepfry.py b/ubot/modules/deepfry.py index 0c1533e..90fb1de 100644 --- a/ubot/modules/deepfry.py +++ b/ubot/modules/deepfry.py @@ -43,19 +43,11 @@ async def deepfryer(event): except ValueError: frycount = 1 - if event.is_reply: - reply_message = await event.get_reply_message() - data = await ldr.get_image(reply_message) + data = await event.get_image() - if not data: - await event.reply("I can't deep fry that!") - return - else: - data = await ldr.get_image(event) - - if not data: - await event.reply("Reply to an image or sticker or caption an image to deep fry it!") - return + if not data: + await event.reply("Reply to an image or sticker or caption an image to deep fry it!") + return # Download photo (highres) as byte array image = io.BytesIO() diff --git a/ubot/modules/evaluation.py b/ubot/modules/evaluation.py index 6a85a2f..6341779 100644 --- a/ubot/modules/evaluation.py +++ b/ubot/modules/evaluation.py @@ -94,16 +94,14 @@ async def userprofilegetter(event): @ldr.add("stickpng", help="Converts stickers to PNG files.") async def stickertopng(event): - reply = await event.get_reply_message() + sticker = await event.get_sticker() - if reply and await ldr.is_sticker(reply): - sticker_webp_data = reply.sticker - else: + if not sticker: await event.reply("Reply to a sticker to get it as a PNG file!") return sticker_webp_io = io.BytesIO() - await event.client.download_media(sticker_webp_data, sticker_webp_io) + await event.client.download_media(sticker, sticker_webp_io) await event.reply(file=await ldr.run_async(stickertopngsync, sticker_webp_io), force_document=True) @@ -119,16 +117,14 @@ def stickertopngsync(sticker_webp_io): @ldr.add("stickflip", help="Flips stickers horizontally.") async def flipsticker(event): - reply = await event.get_reply_message() + sticker = await event.get_sticker() - if reply and await ldr.is_sticker(reply): - sticker_webp_data = reply.sticker - else: + if not sticker: await event.reply("Reply to a sticker to flip that bitch!") return sticker_webp_io = io.BytesIO() - await event.client.download_media(sticker_webp_data, sticker_webp_io) + await event.client.download_media(sticker, sticker_webp_io) await event.reply(file=await ldr.run_async(flipstickersync, sticker_webp_io)) @@ -145,19 +141,11 @@ def flipstickersync(sticker_webp_io): @ldr.add("stickimg", help="Converts images to sticker sized PNG files.") async def createsticker(event): - if event.is_reply: - reply_message = await event.get_reply_message() - data = await ldr.get_image(reply_message) + data = await event.get_image() - if not data: - await event.reply("Reply to or caption an image to make it sticker-sized!") - return - else: - data = await ldr.get_image(event) - - if not data: - await event.reply("Reply to or caption an image to make it sticker-sized!") - return + if not data: + await event.reply("Reply to or caption an image to make it sticker-sized!") + return image_io = io.BytesIO() await event.client.download_media(data, image_io) @@ -187,7 +175,11 @@ def createstickersync(image_io): @ldr.add("compress") async def compressor(event): - reply = await event.get_reply_message() + sticker = await event.get_sticker() + + if not sticker: + await event.reply("Reply to a sticker to compress that bitch!") + return try: compression_quality = int(event.args) @@ -198,14 +190,8 @@ async def compressor(event): except ValueError: compression_quality = 15 - if reply and await ldr.is_sticker(reply): - sticker_webp_data = reply.sticker - else: - await event.reply("Reply to a sticker to compress that bitch!") - return - sticker_io = io.BytesIO() - await event.client.download_media(sticker_webp_data, sticker_io) + await event.client.download_media(sticker, sticker_io) await event.reply(file=await ldr.run_async(compressorsync, sticker_io, compression_quality)) diff --git a/ubot/modules/memes.py b/ubot/modules/memes.py index e9c3281..81c2b9e 100644 --- a/ubot/modules/memes.py +++ b/ubot/modules/memes.py @@ -17,7 +17,7 @@ zal_chars = " ̷̡̛̮͇̝͉̫̭͈͗͂̎͌̒̉̋́͜ ̵̠͕͍̌́̀̑̐̇̎̚͝ @ldr.add("cp") async def copypasta(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) text_arg = await shitpostify(text_arg) text_arg = await mockify(text_arg) @@ -32,7 +32,7 @@ async def copypasta(event): @ldr.add("mock") async def mock(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) mock_text = await mockify(text_arg) @@ -44,7 +44,7 @@ async def mock(event): @ldr.add("vap") async def vapor(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) vapor_text = await vaporize(text_arg) @@ -56,7 +56,7 @@ async def vapor(event): @ldr.add("pop") async def popifycmd(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) pop_text = await popify(text_arg) @@ -68,7 +68,7 @@ async def popifycmd(event): @ldr.add("cheem") async def cheemifycmd(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) cheems_text = await cheemify(text_arg) @@ -80,7 +80,7 @@ async def cheemifycmd(event): @ldr.add("zal") async def zalgo(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) zalgo_text = await zalgofy(text_arg) @@ -92,7 +92,7 @@ async def zalgo(event): @ldr.add("owo") async def owo(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) owo_text = await owoify(text_arg) @@ -104,7 +104,7 @@ async def owo(event): @ldr.add("yoda") async def yodafy(event): - text_arg, reply = await ldr.get_text(event, default=filler, return_msg=True) + text_arg, reply = await event.get_text(default=filler, return_msg=True) async with ldr.aioclient.get("http://yoda-api.appspot.com/api/v1/yodish", params={"text": text_arg}) as response: if response.status == 200: diff --git a/ubot/modules/scrapers.py b/ubot/modules/scrapers.py index 06b4f10..d1d5c3d 100644 --- a/ubot/modules/scrapers.py +++ b/ubot/modules/scrapers.py @@ -98,7 +98,7 @@ def pokemon_image_sync(sprite_io): @ldr.add("tts", help="Text to speech.") async def text_to_speech(event): - text, reply = await ldr.get_text(event, return_msg=True) + text, reply = await event.get_text(return_msg=True) if not text: await event.reply("Give me text or reply to text to use TTS.") @@ -123,7 +123,7 @@ async def text_to_speech(event): @ldr.add("ip", help="IP lookup.") async def ip_lookup(event): - ip = await ldr.get_text(event) + ip = await event.get_text() if not ip: await event.reply("Provide an IP!")