2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-22 10:09:40 +00:00
This commit is contained in:
Nick80835 2023-10-16 09:46:31 -04:00
commit 6b93e9d6b1
19 changed files with 1487 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
__pycache__
*.ini
*.session
*.session-journal
*.db

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
aiohttp
databases[sqlite]
pillow
praw
telethon
speedtest-cli

1
ubot/__init__.py Normal file
View File

@ -0,0 +1 @@
import ubot.micro_bot

1
ubot/__main__.py Normal file
View File

@ -0,0 +1 @@
#dummy

51
ubot/command_handler.py Normal file
View File

@ -0,0 +1,51 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from re import escape, search
from telethon import events
class CommandHandler():
def __init__(self, client, logger):
self.outgoing_commands = {}
self.incoming_commands = {}
self.logger = logger
client.add_event_handler(self.handle_outgoing, events.NewMessage(outgoing=True))
client.add_event_handler(self.handle_incoming, events.NewMessage(incoming=True))
async def handle_outgoing(self, event):
for cmd in self.outgoing_commands.keys():
if search(cmd, event.text):
event.pattern_match = search(cmd, event.text)
try:
await self.outgoing_commands.get(cmd)(event)
return
except Exception as exception:
self.logger.warn(f"{self.outgoing_commands.get(cmd).__name__} - {exception}")
await event.reply(f"`An error occurred in {self.outgoing_commands.get(cmd).__name__}: {exception}`")
async def handle_incoming(self, event):
for cmd in self.incoming_commands.keys():
if search(cmd, event.text):
event.pattern_match = search(cmd, event.text)
try:
await self.incoming_commands.get(cmd)(event)
return
except Exception as exception:
self.logger.warn(f"{self.incoming_commands.get(cmd).__name__} - {exception}")
await event.reply(f"`An error occurred in {self.incoming_commands.get(cmd).__name__}: {exception}`")

46
ubot/database.py Normal file
View File

@ -0,0 +1,46 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from databases import Database as db
class Database():
def __init__(self, client):
self.db = db("sqlite:///database.db")
client.loop.run_until_complete(self.db.connect())
async def ensure_table(self, table, columns):
await self.db.execute(f"create table if not exists {table} ({' TEXT, '.join(columns) + ' TEXT'})")
return ", ".join(columns)
async def single_row_write(self, table, columns, row, value):
column_string = await self.ensure_table(table, columns)
await self.db.execute(f"delete from {table} where {columns[0]} = '{row}'")
await self.db.execute(f"insert into {table}({column_string}) values ('{row}', '{value}')")
async def single_row_delete(self, table, columns, row):
await self.ensure_table(table, columns)
await self.db.execute(f"delete from {table} where {columns[0]} = '{row}'")
async def single_column_readall(self, table, columns, row):
await self.ensure_table(table, columns)
fetched_tuple = await self.db.fetch_all(f"select {row} from {table}")
fetched_list = [item[0] for item in fetched_tuple]
return fetched_list
async def single_row_read(self, table, columns, row):
await self.ensure_table(table, columns)
content = await self.db.fetch_one(f"select {columns[1]} from {table} where {columns[0]} = '{row}'")
return content[0]

84
ubot/loader.py Normal file
View File

@ -0,0 +1,84 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import glob
from importlib import import_module, reload
from os.path import basename, dirname, isfile
from re import escape
from telethon import events
from .command_handler import CommandHandler
class Loader():
def __init__(self, client, logger, settings):
self.loaded_modules = []
self.all_modules = []
self.client = client
self.logger = logger
self.settings = settings
self.command_handler = CommandHandler(client, logger)
self.botversion = "0.1.2"
def load_all_modules(self):
self._find_all_modules()
for module_name in self.all_modules:
self.loaded_modules.append(import_module("ubot.modules." + module_name))
def reload_all_modules(self):
self.command_handler.outgoing_commands = {}
self.command_handler.incoming_commands = {}
errors = ""
for module in self.loaded_modules:
try:
reload(module)
except Exception as exception:
errors += f"`Error while reloading {module.__name__} -> {exception}\n\n`"
raise exception
return errors or None
def add(self, **args):
prefix = escape(self.settings.get_config("cmd_prefix") or '.')
if args.get('noprefix', None):
del args['noprefix']
prefix = ''
if not args.get('isfilter', False) and args.get('pattern', None) is not None:
args['pattern'] = f"(?is)^{prefix}{args['pattern']}(?: |$)(.*)"
else:
del args['isfilter']
args['pattern'] = f"(?is)(.*){args['pattern']}(.*)"
def decorator(func):
self.command_handler.incoming_commands[args['pattern']] = func
return decorator
def _find_all_modules(self):
mod_paths = glob.glob(dirname(__file__) + "/modules/*.py")
self.all_modules = [
basename(f)[:-3] for f in mod_paths
if isfile(f) and f.endswith(".py")
]
system_index = self.all_modules.index("system")
self.all_modules.insert(0, self.all_modules.pop(system_index))

100
ubot/micro_bot.py Normal file
View File

@ -0,0 +1,100 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from logging import INFO, basicConfig, getLogger
from sys import version_info
import telethon as tt
from telethon.errors.rpcerrorlist import PhoneNumberInvalidError
from telethon.network.connection.tcpabridged import \
ConnectionTcpAbridged as CTA
from .database import Database
from .loader import Loader
from .settings import Settings
if version_info[0] < 3 or version_info[1] < 6:
print("This program requires at least Python 3.6.0 to work correctly, exiting.")
quit(1)
class MicroBot():
def __init__(self):
self.client = None
self.settings = Settings()
self.logger = None
self.loader = None
self.database = None
def start_microbot(self):
self.start_logger()
self.start_client()
self.start_database()
self.start_loader()
self.loader.load_all_modules()
self.client.run_until_disconnected()
def start_database(self):
self.database = Database(self.client)
def start_loader(self):
self.loader = Loader(self.client, self.logger, self.settings)
def start_logger(self):
basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=INFO)
self.logger = getLogger(__name__)
def _check_config(self, api_key, api_hash, session_name):
while api_key is None or api_key == "":
api_key = input("Enter your API key: ")
while api_hash is None or api_hash == "":
api_hash = input("Enter your API hash: ")
self.settings.set_config("api_key", api_key)
self.settings.set_config("api_hash", api_hash)
if session_name is None or session_name == "":
session_name = "user0"
self.settings.set_config("session_name", session_name)
return api_key, api_hash, session_name
def start_client(self):
session_name = self.settings.get_config("session_name")
api_key = self.settings.get_config("api_key")
api_hash = self.settings.get_config("api_hash")
api_key, api_hash, session_name = self._check_config(api_key, api_hash, session_name)
self.client = tt.TelegramClient(session_name, api_key, api_hash, connection=CTA)
try:
self.client.start()
except PhoneNumberInvalidError:
self.logger.error("The phone number provided is invalid, exiting.")
exit(2)
async def stop_client(self, reason=None):
if reason:
self.logger.info("Stopping client for reason: %s", reason)
else:
self.logger.info("Stopping client.")
await self.client.disconnect()
micro_bot = MicroBot()
micro_bot.start_microbot()

171
ubot/modules/catdog.py Normal file
View File

@ -0,0 +1,171 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from aiohttp import ClientSession
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
CAT_URL = 'http://api.thecatapi.com/v1/images/search'
DOG_URL = 'http://api.thedogapi.com/v1/images/search'
SHIBE_URL = 'http://shibe.online/api/shibes'
BIRD_URL = 'http://shibe.online/api/birds'
CAT_API_KEY = 'e5a56813-be40-481c-9c8a-a6585c37c1fe'
DOG_API_KEY = '105555df-5c50-40fe-bd59-d15a17ce1c2e'
CAT_HEADERS = {"x-api-key": CAT_API_KEY}
DOG_HEADERS = {"x-api-key": DOG_API_KEY}
IMGPARAM = {"mime_types": "jpg,png"}
GIFPARAM = {"mime_types": "gif"}
async def neko_atsume(params):
session = ClientSession()
async with session.get(CAT_URL, params=params, headers=CAT_HEADERS) as response:
if response.status == 200:
neko = await response.json()
else:
neko = response.status
await session.close()
return neko
async def inu_atsume(params):
session = ClientSession()
async with session.get(DOG_URL, params=params, headers=DOG_HEADERS) as response:
if response.status == 200:
inu = await response.json()
else:
inu = response.status
await session.close()
return inu
async def shibe_inu_atsume():
session = ClientSession()
async with session.get(SHIBE_URL, params=None, headers=None) as response:
if response.status == 200:
shibe_inu = await response.json()
else:
shibe_inu = response.status
await session.close()
return shibe_inu
async def tori_atsume():
session = ClientSession()
async with session.get(BIRD_URL, params=None, headers=None) as response:
if response.status == 200:
tori = await response.json()
else:
tori = response.status
await session.close()
return tori
@ldr.add(pattern="shibe")
async def shibe(event):
shibe_inu = await shibe_inu_atsume()
if isinstance(shibe_inu, int):
await event.reply(f"`There was an error finding the shibes! :( -> {shibe_inu}`")
return
await event.reply(file=shibe_inu[0])
@ldr.add(pattern="bird")
async def bird(event):
tori = await tori_atsume()
if isinstance(tori, int):
await event.reply(f"`There was an error finding the birdies! :( -> {tori}`")
return
await event.reply(file=tori[0])
@ldr.add(pattern="cat")
async def cat(event):
neko = await neko_atsume(IMGPARAM)
if isinstance(neko, int):
await event.reply(f"`There was an error finding the cats! :( -> {neko}`")
return
await event.reply(file=neko[0]["url"])
@ldr.add(pattern="cathd")
async def cathd(event):
neko = await neko_atsume(IMGPARAM)
if isinstance(neko, int):
await event.reply(f"`There was an error finding the cats! :( -> {neko}`")
return
await event.reply(file=neko[0]["url"], force_document=True)
@ldr.add(pattern="catgif")
async def catgif(event):
neko = await neko_atsume(GIFPARAM)
if isinstance(neko, int):
await event.reply(f"`There was an error finding the cats! :( -> {neko}`")
return
await event.reply(file=neko[0]["url"])
@ldr.add(pattern="dog")
async def dog(event):
inu = await inu_atsume(IMGPARAM)
if isinstance(inu, int):
await event.reply(f"`There was an error finding the dogs! :( -> {inu}`")
return
await event.reply(file=inu[0]["url"])
@ldr.add(pattern="doghd")
async def doghd(event):
inu = await inu_atsume(IMGPARAM)
if isinstance(inu, int):
await event.reply(f"`There was an error finding the dogs! :( -> {inu}`")
return
await event.reply(file=inu[0]["url"], force_document=True).delete()
@ldr.add(pattern="doggif")
async def doggif(event):
inu = await inu_atsume(GIFPARAM)
if isinstance(inu, int):
await event.reply(f"`There was an error finding the dogs! :( -> {inu}`")
return
await event.reply(file=inu[0]["url"])

74
ubot/modules/danbooru.py Normal file
View File

@ -0,0 +1,74 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from aiohttp import ClientSession
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
DAN_URL = "http://danbooru.donmai.us/posts.json"
@ldr.add(pattern="dan(s|x|q|)")
async def danbooru(event):
if "x" in event.pattern_match.group(0):
rating = "Rating:explicit"
elif "s" in event.pattern_match.group(0):
rating = "Rating:safe"
elif "q" in event.pattern_match.group(0):
rating = "Rating:questionable"
else:
rating = ""
search_query = event.pattern_match.group(2)
params = {"limit": 1,
"random": "true",
"tags": f"{rating} {search_query}".strip()}
session = ClientSession()
async with session.get(DAN_URL, params=params) as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"`An error occurred, response code: `**{response.status}**")
return
await session.close()
if not response:
await event.reply(f"`No results for query: `**{search_query}**")
return
valid_urls = []
for url in ['file_url', 'large_file_url', 'source']:
if url in response[0].keys():
valid_urls.append(response[0][url])
if not valid_urls:
await event.reply(f"`Failed to find URLs for query: `**{search_query}**")
return
for image_url in valid_urls:
try:
await event.reply(file=image_url)
return
except:
pass
await event.reply(f"`Failed to fetch media for query: `**{search_query}**")

136
ubot/modules/deepfry.py Normal file
View File

@ -0,0 +1,136 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# Original source for the deepfrying code (used under the following license): https://github.com/Ovyerus/deeppyer
# MIT License
#
# Copyright (c) 2017 Ovyerus
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import io
from random import randint, uniform
from PIL import Image, ImageEnhance, ImageOps
from telethon.tl.types import DocumentAttributeFilename
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
@ldr.add(pattern="deepfry")
async def deepfryer(event):
try:
frycount = int(event.pattern_match.group(1))
if frycount < 1:
frycount = 1
elif frycount > 10:
frycount = 10
except ValueError:
frycount = 1
replied_fry = True
if event.is_reply:
reply_message = await event.get_reply_message()
data = await check_media(reply_message)
if isinstance(data, bool):
await event.reply("`I can't deep fry that!`")
return
else:
data = await check_media(event)
replied_fry = False
if isinstance(data, bool):
await event.reply("`Reply to an image or sticker or caption an image to deep fry it!`")
return
# Download photo (highres) as byte array
image = io.BytesIO()
await event.client.download_media(data, image)
image = Image.open(image)
# Fry the image
image = image.convert("RGB")
for _ in range(frycount):
image = await deepfry(image)
fried_io = io.BytesIO()
fried_io.name = "image.jpeg"
image.save(fried_io, "JPEG")
fried_io.seek(0)
if replied_fry:
await reply_message.reply(file=fried_io)
else:
await event.reply(file=fried_io)
async def deepfry(img):
# Crush image to hell and back
img = ImageOps.posterize(img, randint(3, 7))
# Generate colour overlay
overlay = img.copy()
overlay = ImageEnhance.Contrast(overlay).enhance(uniform(0.7, 1.8))
overlay = ImageEnhance.Brightness(overlay).enhance(uniform(0.8, 1.3))
overlay = ImageEnhance.Color(overlay).enhance(uniform(0.7, 1.4))
# Blend random colors onto and sharpen the image
img = Image.blend(img, overlay, uniform(0.1, 0.4))
img = ImageEnhance.Sharpness(img).enhance(randint(5, 200))
return img
async def check_media(reply_message):
if reply_message and reply_message.media:
if reply_message.photo:
data = reply_message.photo
elif reply_message.document:
if DocumentAttributeFilename(file_name='AnimatedSticker.tgs') in reply_message.media.document.attributes:
return False
if reply_message.gif or reply_message.video or reply_message.audio or reply_message.voice:
return False
data = reply_message.media.document
else:
return False
else:
return False
if not data or data is None:
return False
else:
return data

133
ubot/modules/evaluation.py Normal file
View File

@ -0,0 +1,133 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import inspect
import io
from PIL import Image
from speedtest import Speedtest
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
@ldr.add(pattern="speed")
async def iamspeed(event):
test_message = await event.reply("`Running speed test…`")
test = Speedtest()
test.get_best_server()
test.download()
test.upload()
test.results.share()
result = test.results.dict()
await test_message.edit(
f"`Started at: {result['timestamp']}\n"
f"Download: {speed_convert(result['download'])}\n"
f"Upload: {speed_convert(result['upload'])}\n"
f"Ping: {result['ping']} milliseconds\n"
f"ISP: {result['client']['isp']}`"
)
def speed_convert(size):
power = 2**10
zero = 0
units = {0: '', 1: 'Kilobits/s', 2: 'Megabits/s', 3: 'Gigabits/s', 4: 'Terabits/s'}
while size > power:
size /= power
zero += 1
return f"{round(size, 2)} {units[zero]}"
@ldr.add(pattern="chatid")
async def chatidgetter(event):
if event.is_reply:
reply = await event.get_reply_message()
if reply.forward and reply.forward.channel_id:
await event.reply(f"**Channel ID:**` {reply.forward.channel_id}`")
return
chat_id = reply.chat_id
else:
chat_id = event.chat_id
await event.reply(f"**Chat ID:**` {chat_id}`")
@ldr.add(pattern="userid")
async def useridgetter(event):
if event.is_reply:
reply = await event.get_reply_message()
user_id = reply.from_id
else:
user_id = event.from_id
await event.reply(f"**User ID:**` {user_id}`")
@ldr.add(pattern="profile")
async def userprofilegetter(event):
user_arg = event.pattern_match.group(1)
if user_arg:
try:
user_entity = await event.client.get_entity(user_arg)
except (ValueError, TypeError):
await event.reply("`The ID or username you provided was invalid!`")
return
elif event.is_reply:
reply = await event.get_reply_message()
reply_id = reply.from_id
if reply_id:
try:
user_entity = await event.client.get_entity(reply_id)
except (ValueError, TypeError):
await event.reply("`There was an error getting the user!`")
return
else:
await event.reply("`The user may have super sneaky privacy settings enabled!`")
return
else:
await event.reply("`Give me a user ID, username or reply!`")
return
userid = user_entity.id
username = user_entity.username
userfullname = f"{user_entity.first_name} {user_entity.last_name or ''}"
await event.reply(f"**Full Name:** {userfullname}\n**Username:** @{username}\n**User ID:** {userid}")
@ldr.add(pattern="stickpng")
async def stickertopng(event):
reply = await event.get_reply_message()
if reply.sticker:
sticker_webp_data = reply.sticker
else:
await event.reply("`Reply to a sticker to get it as a PNG file!`")
return
sticker_webp_io = io.BytesIO()
await event.client.download_media(sticker_webp_data, sticker_webp_io)
sticker_webp = Image.open(sticker_webp_io)
sticker_png_io = io.BytesIO()
sticker_webp.save(sticker_png_io, "PNG")
sticker_png_io.name = "sticker.png"
sticker_png_io.seek(0)
await event.reply(file=sticker_png_io, force_document=True)

208
ubot/modules/memes.py Normal file
View File

@ -0,0 +1,208 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from asyncio import sleep
from random import choice, randint
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
emoji = list("😂😝🤪🤩😤🥵🤯🥶😱🤔😩🙄💀👻🤡😹👀👁👌💦🔥🌚🌝🌞🔫💯")
b_emoji = "🅱️"
a_emoji = "🅰️"
i_emoji = ""
owo_faces = "owo uwu owu uwo u-u o-o OwO UwU @-@ ;-; ;_; ._. (._.) (o-o) ('._.) (。◕‿‿◕。)" \
" (。◕‿◕。) (─‿‿─) ◔⌣◔ ◉_◉".split(sep=" ")
zal_chars = " ̷̡̛̮͇̝͉̫̭͈͗͂̎͌̒̉̋́͜ ̵̠͕͍̩̟͚͍̞̳̌́̀̑̐̇̎̚͝ ̸̻̠̮̬̻͇͈̮̯̋̄͛̊͋̐̇͝͠ ̵̧̟͎͈̪̜̫̪͖̎͛̀͋͗́̍̊͠ ̵͍͉̟͕͇͎̖̹̔͌̊̏̌̽́̈́͊ͅ ̷̥͚̼̬̦͓͇̗͕͊̏͂͆̈̀̚͘̚ ̵̢̨̗̝̳͉̱̦͖̔̾͒͊͒̎̂̎͝ ̵̞̜̭̦̖̺͉̞̃͂͋̒̋͂̈́͘̕͜ ̶̢̢͇̲̥̗̟̏͛̇̏̊̑̌̔̚ͅͅ ̷̮͖͚̦̦̞̱̠̰̍̆̐͆͆͆̈̌́ ̶̲͚̪̪̪͍̹̜̬͊̆͋̄͒̾͆͝͝ ̴̨̛͍͖͎̞͍̞͕̟͑͊̉͗͑͆͘̕ ̶͕̪̞̲̘̬͖̙̞̽͌͗̽̒͋̾̍̀ ̵̨̧̡̧̖͔̞̠̝̌̂̐̉̊̈́́̑̓ ̶̛̱̼̗̱̙͖̳̬͇̽̈̀̀̎̋͌͝ ̷̧̺͈̫̖̖͈̱͎͋͌̆̈̃̐́̀̈".replace(" ", "")
@ldr.add(pattern="cp")
async def copypasta(event):
text_arg, reply = await get_text_arg(event)
text_arg = await shitpostify(text_arg)
text_arg = await mockify(text_arg)
text_arg = await emojify(text_arg)
cp_text = await vaporize(text_arg)
if reply:
await reply.reply(cp_text)
else:
await event.reply(cp_text)
@ldr.add(pattern="mock")
async def mock(event):
text_arg, reply = await get_text_arg(event)
mock_text = await mockify(text_arg)
if reply:
await reply.reply(mock_text)
else:
await event.reply(mock_text)
@ldr.add(pattern="vap")
async def vapor(event):
text_arg, reply = await get_text_arg(event)
vapor_text = await vaporize(text_arg)
if reply:
await reply.reply(vapor_text)
else:
await event.reply(vapor_text)
@ldr.add(pattern="zal")
async def zalgo(event):
text_arg, reply = await get_text_arg(event)
zalgo_text = await zalgofy(text_arg)
if reply:
await reply.reply(zalgo_text)
else:
await event.reply(zalgo_text)
@ldr.add(pattern="owo")
async def owo(event):
text_arg, reply = await get_text_arg(event)
owo_text = await owoify(text_arg)
if reply:
await reply.reply(owo_text)
else:
await event.reply(owo_text)
async def get_text_arg(event):
text_arg = event.pattern_match.group(1)
reply = False
if text_arg:
pass
elif event.is_reply:
reply = await event.get_reply_message()
text_arg = reply.text
else:
text_arg = "Give me some text to fuck it up!"
return text_arg, reply
async def shitpostify(text):
text = text.replace("dick", "peepee")
text = text.replace("ck", "cc")
text = text.replace("lol", "honk honk")
text = text.replace("though", "tho")
text = text.replace("cat", "pussy")
text = text.replace("dark", "dank")
return text
async def mockify(text):
mock_text = ""
for letter in text:
if len(mock_text) >= 2:
if ''.join(mock_text[-2:-1]).islower():
mock_text += letter.upper()
continue
if ''.join(mock_text[-2:-1]).isupper():
mock_text += letter.lower()
continue
if randint(1, 2) == randint(1, 2):
mock_text += letter.lower()
else:
mock_text += letter.upper()
return mock_text
async def emojify(text):
text = text.replace("ab", "🆎")
text = text.replace("cl", "🆑")
text = text.replace("b", "🅱️")
text = text.replace("a", "🅰️")
text = text.replace("i", "")
text = text.replace("AB", "🆎")
text = text.replace("CL", "🆑")
text = text.replace("B", "🅱️")
text = text.replace("A", "🅰️")
text = text.replace("I", "")
emoji_text = ""
for letter in text:
if letter == " ":
emoji_text += choice(emoji)
else:
emoji_text += letter
return emoji_text
async def vaporize(text):
vapor_text = ""
char_distance = 65248
for letter in text:
ord_letter = ord(letter)
if ord('!') <= ord_letter <= ord('~'):
letter = chr(ord_letter + char_distance)
vapor_text += letter
return vapor_text
async def owoify(text):
text = text.replace("r", "w")
text = text.replace("R", "W")
text = text.replace("n", "ny")
text = text.replace("N", "NY")
text = text.replace("ll", "w")
text = text.replace("LL", "W")
text = text.replace("l", "w")
text = text.replace("L", "W")
text += f" {choice(owo_faces)}"
return text
async def zalgofy(text):
zalgo_text = ""
for letter in text:
if letter == " ":
zalgo_text += letter
continue
letter += choice(zal_chars)
letter += choice(zal_chars)
letter += choice(zal_chars)
zalgo_text += letter
return zalgo_text

119
ubot/modules/notes.py Normal file
View File

@ -0,0 +1,119 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
db = micro_bot.database
note_columns = ["notename", "notecontent"]
@ldr.add(pattern="save")
async def savenote(event):
notename, notecontent = await get_text_arg(event)
if not notename or not notecontent:
await event.edit("`Provide both a note name and content to save!`")
return
await event.edit(f"`Saving to note `**{notename}**`…`")
await db.single_row_write("Notes", note_columns, notename, notecontent)
await event.edit(f"`Successfully saved to note `**{notename}**`!`")
@ldr.add(pattern="get")
async def getnote(event):
notename = event.pattern_match.group(1).replace(" ", "_")
if not notename:
notelist = '\n'.join(await db.single_column_readall("Notes", note_columns, "notename"))
if notelist:
await event.edit(f"`Provide a note name to get its content, saved notes:`\n**{notelist}**")
return
else:
await event.edit(f"`You haven't saved any notes!\nUse `**{micro_bot.settings.get_config('cmd_prefix') or '.'}save**` to save notes.`")
return
notecontent = await db.single_row_read("Notes", note_columns, notename)
if notecontent:
await event.edit(f"{notecontent}")
else:
notelist = '\n'.join(await db.single_column_readall("Notes", note_columns, "notename"))
if notelist:
await event.edit(f"`Note `**{notename}**` not found, saved notes:`\n**{notelist}**")
else:
await event.edit(f"`You haven't saved any notes!\nUse `**{micro_bot.settings.get_config('cmd_prefix') or '.'}save**` to save notes.`")
@ldr.add(incoming=True, noprefix=True, pattern="#(.*)")
async def getnoteincoming(event):
if not micro_bot.settings.get_bool("incoming_allowed"):
return
notename = event.pattern_match.group(0).replace(" ", "_").lstrip("#")
if not notename:
notelist = '\n'.join(await db.single_column_readall("Notes", note_columns, "notename"))
if notelist:
await event.reply(f"`Provide a note name to get its content, saved notes:`\n**{notelist}**")
return
else:
await event.reply(f"`There aren't any saved notes!`")
return
notecontent = await db.single_row_read("Notes", note_columns, notename)
if notecontent:
await event.reply(f"{notecontent}")
else:
notelist = '\n'.join(await db.single_column_readall("Notes", note_columns, "notename"))
if notelist:
await event.reply(f"`Note `**{notename}**` not found, saved notes:`\n**{notelist}**")
else:
await event.reply(f"`There aren't any saved notes!`")
@ldr.add(pattern="del")
async def delnote(event):
notename = event.pattern_match.group(1).replace(" ", "_")
if not notename:
await event.edit("`I need a notes name to delete it!`")
return
await db.single_row_delete("Notes", note_columns, notename)
await event.edit(f"`Note `**{notename}**` successfully deleted!`")
async def get_text_arg(event):
text_arg = event.pattern_match.group(1)
notename = None
notecontent = None
if event.is_reply and text_arg:
reply = await event.get_reply_message()
notename = text_arg.replace(" ", "_")
notecontent = reply.text
elif text_arg:
notename = text_arg.split(" ")[0]
notecontent = " ".join(text_arg.split(" ")[1:])
return notename, notecontent

153
ubot/modules/reddit.py Normal file
View File

@ -0,0 +1,153 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import io
from random import choice
import praw
from aiohttp import ClientSession
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
REDDIT = praw.Reddit(client_id='-fmzwojFG6JkGg',
client_secret=None,
user_agent='TG_Userbot')
VALID_ENDS = (".mp4", ".jpg", ".jpeg", ".png", ".gif")
async def imagefetcherfallback(sub):
hot = REDDIT.subreddit(sub).hot()
hot_list = list(hot.__iter__())
for _ in range(10):
post = choice(hot_list)
if post.url:
if post.url.endswith(VALID_ENDS):
return post.url, post.title
return None, None
async def titlefetcherfallback(sub):
hot = REDDIT.subreddit(sub).hot()
hot_list = list(hot.__iter__())
return choice(hot_list).title
async def imagefetcher(event, sub):
image_url = False
for _ in range(10):
post = REDDIT.subreddit(sub).random()
if not post:
image_url, title = await imagefetcherfallback(sub)
break
if post.url:
for ending in VALID_ENDS:
if post.url.endswith(ending):
file_ending = ending
image_url = post.url
title = post.title
break
if not image_url:
await event.reply(f"`Failed to find any valid content on `**r/{sub}**`!`")
return
try:
image_io = io.BytesIO()
session = ClientSession()
async with session.get(image_url) as response:
if response.status == 200:
image_io.write(await response.read())
image_io.name = f"reddit_content{file_ending}"
image_io.seek(0)
else:
raise Exception
await session.close()
await event.reply(title, file=image_io)
except:
await session.close()
await event.reply(f"`Failed to download content from `**r/{sub}**`!`\n`Title: `**{title}**\n`URL: `{image_url}")
async def titlefetcher(event, sub):
post = REDDIT.subreddit(sub).random()
if not post:
title = await titlefetcherfallback(sub)
else:
title = post.title
await event.reply(title)
@ldr.add(pattern="redi")
async def redimg(event):
sub = event.pattern_match.group(1)
if sub:
await imagefetcher(event, sub)
else:
await event.reply("Syntax: .redi <subreddit name>")
@ldr.add(pattern="redt")
async def redtit(event):
sub = event.pattern_match.group(1)
if sub:
await titlefetcher(event, sub)
else:
await event.reply("Syntax: .redt <subreddit name>")
@ldr.add(pattern="suffer")
async def makemesuffer(event):
await imagefetcher(event, "MakeMeSuffer")
@ldr.add(pattern="snafu")
async def coaxedintoasnafu(event):
await imagefetcher(event, "CoaxedIntoASnafu")
@ldr.add(pattern="aita")
async def amitheasshole(event):
await titlefetcher(event, "AmITheAsshole")
@ldr.add(pattern="jon(x|)")
async def imsorryjon(event):
if "x" in event.pattern_match.group(0):
sub = "ImReallySorryJon"
else:
sub = "ImSorryJon"
await imagefetcher(event, sub)
@ldr.add(pattern="tihi")
async def thanksihateit(event):
await imagefetcher(event, "TIHI")

94
ubot/modules/system.py Normal file
View File

@ -0,0 +1,94 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from platform import python_version, uname
from time import time_ns
from telethon import version
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
#@ldr.add(pattern="reload")
async def reload_modules(event):
await event.edit("`Reloading modules…`")
errors = ldr.reload_all_modules()
if errors:
await event.edit(errors)
else:
try:
await event.delete()
except:
pass
@ldr.add(pattern="alive")
async def alive(event):
alive_format = "`μBot is running under {0}.\n\n" \
"Version: {1}\n" \
"Telethon: {2}\n" \
"Python: {3}`"
await event.reply(alive_format.format(uname().node, ldr.botversion, version.__version__, python_version()))
#@ldr.add(pattern="shutdown")
async def shutdown(event):
await event.edit("`Goodbye…`")
await micro_bot.stop_client()
@ldr.add(pattern="ping")
async def ping(event):
start = time_ns()
reply = await event.reply("`Ping…`")
time_taken_ms = (time_ns() - start) / 1000000
await reply.edit(f"`Ping… Pong! -> `**{time_taken_ms}**`ms`")
#@ldr.add(pattern="prefix")
async def change_prefix(event):
new_prefix = event.pattern_match.group(1)
if not new_prefix:
await event.edit("`Please specify a valid command prefix!`")
return
micro_bot.settings.set_config("cmd_prefix", new_prefix)
errors = ldr.reload_all_modules()
if errors:
await event.edit(f"`Command prefix successfully changed to `**{new_prefix}**` but there were errors:`\n\n{errors}")
else:
await event.edit(f"`Command prefix successfully changed to `**{new_prefix}**`!`")
@ldr.add(pattern="repo")
async def bot_repo(event):
await event.reply("https://github.com/Nick80835/microbot")
#@ldr.add(pattern="toggleincoming")
async def toggleincoming(event):
if micro_bot.settings.get_bool("incoming_allowed"):
micro_bot.settings.set_config("incoming_allowed", "False")
await event.edit("`Successfully disabled incoming commands!`")
else:
micro_bot.settings.set_config("incoming_allowed", "True")
await event.edit("`Successfully enabled incoming commands!`")

70
ubot/modules/urbandict.py Normal file
View File

@ -0,0 +1,70 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import io
from aiohttp import ClientSession
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
UD_QUERY_URL = 'http://api.urbandictionary.com/v0/define'
UD_RANDOM_URL = 'http://api.urbandictionary.com/v0/random'
@ldr.add(pattern="ud")
async def urban_dict(event):
udquery = event.pattern_match.group(1)
if udquery:
params = {'term': udquery}
url = UD_QUERY_URL
else:
params = None
url = UD_RANDOM_URL
session = ClientSession()
async with session.get(url, params=params) as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"`An error occurred, response code:` **{response.status}**")
return
await session.close()
if response['list']:
response_word = response['list'][0]
else:
await event.reply(f"`No results for query:` **{udquery}**")
return
definition = f"**{response_word['word']}**: `{response_word['definition']}`"
if response_word['example']:
definition += f"\n\n**Example**: `{response_word['example']}`"
definition = definition.replace("[", "").replace("]", "")
if len(definition) >= 4096:
file_io = io.BytesIO()
file_io.write(bytes(definition.encode('utf-8')))
file_io.name = f"definition of {response_word['word']}.txt"
await event.reply(file=file_io, caption="`Output was too large, sent it as a file.`")
return
await event.reply(definition)

Binary file not shown.

35
ubot/settings.py Normal file
View File

@ -0,0 +1,35 @@
# Copyright (C) 2019 Nick Filmer (nick80835@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from configparser import SafeConfigParser
class Settings():
def __init__(self):
self.config = SafeConfigParser()
self.config.read("settings.ini")
def get_config(self, key):
return self.config.get("DEFAULT", key, fallback=None)
def get_bool(self, key):
return bool(self.config.get("DEFAULT", key, fallback=False) == "True")
def set_config(self, key, value):
self.config.set("DEFAULT", key, value)
with open('settings.ini', 'w') as config_file:
self.config.write(config_file)
config_file.close()