2
0
mirror of https://github.com/Nick80835/microbot synced 2025-08-23 18:49:31 +00:00
microbot/ubot/modules/scrapers.py

224 lines
7.7 KiB
Python
Raw Normal View History

# SPDX-License-Identifier: GPL-2.0-or-later
import io
import os
import re
import pafy
from gtts import gTTS
from howdoi import howdoi
2020-06-17 14:29:34 -04:00
from PIL import Image
2020-07-23 10:29:01 -04:00
from telethon.tl.types import DocumentAttributeVideo
2020-06-19 14:40:41 -04:00
from ubot.micro_bot import ldr
os.environ["HOWDOI_SEARCH_ENGINE"] = "bing"
2020-07-21 09:44:28 -04:00
@ldr.add("dadjoke", help="Fetches the most funny shit you've ever read.")
async def dadjoke(event):
async with ldr.aioclient.get("https://icanhazdadjoke.com/", headers={"Accept": "application/json"}) as response:
if response.status == 200:
dad_joke = (await response.json())["joke"]
else:
2020-06-17 16:55:50 -04:00
await event.reply(f"An error occurred: **{response.status}**")
return
await event.reply(dad_joke)
2020-07-21 09:44:28 -04:00
@ldr.add("fact", help="Fetches random facts.")
async def randomfact(event):
async with ldr.aioclient.get("https://uselessfacts.jsph.pl/random.json", params={"language": "en"}) as response:
if response.status == 200:
random_fact = (await response.json())["text"].replace("`", "'")
else:
2020-06-17 16:55:50 -04:00
await event.reply(f"An error occurred: **{response.status}**")
return
await event.reply(random_fact)
2020-07-21 09:44:28 -04:00
@ldr.add("fakeword", help="Fetches random fake words.")
2020-07-19 17:07:30 -04:00
async def fakeword(event):
async with ldr.aioclient.get("https://www.thisworddoesnotexist.com/api/random_word.json") as response:
if response.status == 200:
random_word_json = (await response.json())["word"]
word = random_word_json["word"]
definition = random_word_json["definition"]
example = random_word_json["example"]
else:
await event.reply(f"An error occurred: **{response.status}**")
return
await event.reply(f"**{word}:** __{definition}__\n\n**Example:** __{example}__")
2020-07-21 09:44:28 -04:00
@ldr.add("pokemon", pattern_extra="(s|)", help="Fetches Pokemon sprites, requires a name or ID as an argument.")
2020-06-17 14:29:34 -04:00
async def pokemon_image(event):
if not event.args:
await event.reply("Specify a Pokémon name!")
return
async with ldr.aioclient.get("https://pokeapi.co/api/v2/pokemon/" + event.args) as response:
if response.status == 200:
sprite_url = (await response.json())["sprites"]["front_shiny" if event.other_args[0] else "front_default"]
2020-06-17 14:29:34 -04:00
else:
2020-06-17 16:55:50 -04:00
await event.reply(f"An error occurred: **{response.status}**")
2020-06-17 14:29:34 -04:00
return
2020-06-17 20:52:05 -04:00
if not sprite_url:
await event.reply("That Pokémon config doesnt have an available sprite!")
return
2020-06-17 14:29:34 -04:00
async with ldr.aioclient.get(sprite_url) as response:
if response.status == 200:
sprite_io = await response.read()
else:
2020-06-17 16:55:50 -04:00
await event.reply(f"An error occurred: **{response.status}**")
2020-06-17 14:29:34 -04:00
return
sticker_image = Image.open(io.BytesIO(sprite_io))
2020-06-17 14:42:11 -04:00
sticker_image = sticker_image.crop(sticker_image.getbbox())
sticker_image = sticker_image.resize((sticker_image.size[0]*4, sticker_image.size[1]*4), Image.NEAREST)
2020-06-17 14:29:34 -04:00
sticker_io = io.BytesIO()
sticker_image.save(sticker_io, "WebP", quality=99)
sticker_io.seek(0)
sticker_io.name = "sticker.webp"
await event.reply(file=sticker_io)
@ldr.add("hdi")
async def howdoi_cmd(event):
if not event.args:
2020-06-23 15:28:09 -04:00
await event.reply(f"Syntax: {ldr.prefix()}hdi <question>")
return
response = howdoi.howdoi(vars(howdoi.get_parser().parse_args(event.args.split(' '))))
response = re.sub(r'\n\n+', '\n\n', response).strip()
await event.reply(f"**Query:**\n{event.args}\n**Answer:**\n{response}")
2020-07-21 09:44:28 -04:00
@ldr.add("tts", help="Text to speech.")
async def text_to_speech(event):
text, reply = await ldr.get_text(event, return_msg=True)
if not text:
await event.reply("Give me text or reply to text to use TTS.")
return
tts_bytesio = io.BytesIO()
tts_bytesio.name = "tts.mp3"
try:
tts = gTTS(text, lang="EN")
tts.write_to_fp(tts_bytesio)
tts_bytesio.seek(0)
except AssertionError:
await event.reply('The text is empty.\nNothing left to speak after pre-precessing, tokenizing and cleaning.')
return
except RuntimeError:
await event.reply('Error loading the languages dictionary.')
return
await event.client.send_file(event.chat_id, tts_bytesio, voice_note=True, reply_to=reply)
2020-07-21 09:44:28 -04:00
@ldr.add("ip", help="IP lookup.")
async def ip_lookup(event):
ip = await ldr.get_text(event)
if not ip:
await event.reply("Provide an IP!")
return
async with ldr.aioclient.get(f"http://ip-api.com/json/{ip}") as response:
if response.status == 200:
lookup_json = await response.json()
else:
await event.reply(f"An error occurred when looking for **{ip}**: **{response.status}**")
return
fixed_lookup = {}
for key, value in lookup_json.items():
special = {"lat": "Latitude", "lon": "Longitude", "isp": "ISP", "as": "AS", "asname": "AS name"}
if key in special:
fixed_lookup[special[key]] = str(value)
continue
key = re.sub(r"([a-z])([A-Z])", r"\g<1> \g<2>", key)
key = key.capitalize()
if not value:
value = "None"
fixed_lookup[key] = str(value)
text = ""
for key, value in fixed_lookup.items():
text = text + f"**{key}:** {value}\n"
await event.reply(text)
2020-07-21 09:44:28 -04:00
@ldr.add("corona", help="Fetches Coronavirus stats, takes an optional country name as an argument.")
async def corona(event):
if event.args:
async with ldr.aioclient.get(f"https://corona.lmao.ninja/v2/countries/{event.args}") as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"An error occurred, response code: **{response.status}**")
return
response_text = f"Stats for **{response['country']}**\n\n**Cases:** {response['cases']} ({response['todayCases']} today)\n**Deaths:** {response['deaths']} ({response['todayDeaths']} today)\n**Recoveries:** {response['recovered']}"
await event.reply(response_text)
else:
async with ldr.aioclient.get(f"https://corona.lmao.ninja/v2/all") as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"`An error occurred, response code: `**{response.status}**")
return
response_text = f"Global stats\n\n**Cases:** {response['cases']} ({response['todayCases']} today)\n**Deaths:** {response['deaths']} ({response['todayDeaths']} today)\n**Recoveries:** {response['recovered']}"
await event.reply(response_text)
@ldr.add("yt", userlocking=True)
async def youtube_cmd(event):
video = pafy.new(event.args)
video_stream = video.getbest()
2020-07-23 10:03:34 -04:00
try:
2020-07-23 10:03:34 -04:00
if await ldr.cache.is_cache_required(video_stream.url):
file_path = await ldr.cache.cache_file(video_stream.url, f"{event.chat_id}_{event.id}")
2020-07-23 10:29:01 -04:00
await event.client.send_file(event.chat, file=file_path, attributes=[
DocumentAttributeVideo(
duration=video.length,
w=video_stream.dimensions[0],
h=video_stream.dimensions[1],
supports_streaming=True
)])
2020-07-23 10:03:34 -04:00
ldr.cache.remove_cache(file_path)
else:
await event.reply(file=video_stream.url)
except:
await event.reply(f"Download failed: [URL]({video_stream.url})")
@ldr.add("yta", userlocking=True)
async def youtube_audio_cmd(event):
video = pafy.new(event.args)
video_stream = video.getbestaudio()
try:
await event.reply(file=video_stream.url)
except:
await event.reply(f"Download failed: [URL]({video_stream.url})")