2
0
mirror of https://github.com/Nick80835/microbot synced 2025-09-03 16:05:49 +00:00

move a lot of commands to a scrapers module

This commit is contained in:
Nick80835
2020-06-17 13:38:36 -04:00
parent c38aeec342
commit 5d9dc5df43
6 changed files with 159 additions and 170 deletions

View File

@@ -1,29 +0,0 @@
# SPDX-License-Identifier: GPL-2.0-or-later
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
@ldr.add("corona")
async def corona(event):
if event.args:
async with ldr.aioclient.get(f"https://corona.lmao.ninja/v2/countries/{event.args}") as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"An error occurred, response code: **{response.status}**")
return
response_text = f"Stats for **{response['country']}**\n\n**Cases:** {response['cases']} ({response['todayCases']} today)\n**Deaths:** {response['deaths']} ({response['todayDeaths']} today)\n**Recoveries:** {response['recovered']}"
await event.reply(response_text)
else:
async with ldr.aioclient.get(f"https://corona.lmao.ninja/v2/all") as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"`An error occurred, response code: `**{response.status}**")
return
response_text = f"Global stats\n\n**Cases:** {response['cases']} ({response['todayCases']} today)\n**Deaths:** {response['deaths']} ({response['todayDeaths']} today)\n**Recoveries:** {response['recovered']}"
await event.reply(response_text)

View File

@@ -2,9 +2,7 @@
import inspect import inspect
import io import io
from re import sub
from gtts import gTTS
from PIL import Image, ImageOps from PIL import Image, ImageOps
from speedtest import Speedtest from speedtest import Speedtest
@@ -43,70 +41,6 @@ def speed_convert(size):
return f"{round(size, 2)} {units[zero]}" return f"{round(size, 2)} {units[zero]}"
@ldr.add("tts")
async def text_to_speech(event):
text, reply = await ldr.get_text(event, return_msg=True)
if not text:
await event.reply("Give me text or reply to text to use TTS.")
return
tts_bytesio = io.BytesIO()
tts_bytesio.name = "tts.mp3"
try:
tts = gTTS(text, lang="EN")
tts.write_to_fp(tts_bytesio)
tts_bytesio.seek(0)
except AssertionError:
await event.reply('The text is empty.\nNothing left to speak after pre-precessing, tokenizing and cleaning.')
return
except RuntimeError:
await event.reply('Error loading the languages dictionary.')
return
await event.client.send_file(event.chat_id, tts_bytesio, voice_note=True, reply_to=reply)
@ldr.add("ip")
async def ip_lookup(event):
ip = await ldr.get_text(event)
if not ip:
await event.reply("Provide an IP!")
return
async with ldr.aioclient.get(f"http://ip-api.com/json/{ip}") as response:
if response.status == 200:
lookup_json = await response.json()
else:
await event.reply(f"An error occurred when looking for **{ip}**: **{response.status}**")
return
fixed_lookup = {}
for key, value in lookup_json.items():
special = {"lat": "Latitude", "lon": "Longitude", "isp": "ISP", "as": "AS", "asname": "AS name"}
if key in special:
fixed_lookup[special[key]] = str(value)
continue
key = sub(r"([a-z])([A-Z])", r"\g<1> \g<2>", key)
key = key.capitalize()
if not value:
value = "None"
fixed_lookup[key] = str(value)
text = ""
for key, value in fixed_lookup.items():
text = text + f"**{key}:** {value}\n"
await event.reply(text)
@ldr.add("chatid") @ldr.add("chatid")
async def chatidgetter(event): async def chatidgetter(event):
if event.is_reply: if event.is_reply:

View File

@@ -1,23 +0,0 @@
# SPDX-License-Identifier: GPL-2.0-or-later
import os
import re
from howdoi import howdoi
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
os.environ["HOWDOI_SEARCH_ENGINE"] = "bing"
@ldr.add("hdi")
async def howdoi_cmd(event):
if not event.args:
await event.reply(f"Syntax: {ldr.settings.get_config('cmd_prefix') or '.'}hdi <question>")
return
response = howdoi.howdoi(vars(howdoi.get_parser().parse_args(event.args.split(' '))))
response = re.sub(r'\n\n+', '\n\n', response).strip()
await event.reply(f"**Query:**\n{event.args}\n**Answer:**\n{response}")

View File

@@ -124,30 +124,6 @@ async def yodafy(event):
await event.reply(yoda_text) await event.reply(yoda_text)
@ldr.add("dadjoke")
async def dadjoke(event):
async with ldr.aioclient.get("https://icanhazdadjoke.com/", headers={"Accept": "application/json"}) as response:
if response.status == 200:
dad_joke = (await response.json())["joke"]
else:
await event.reply(f"An error occured: **{response.status}**")
return
await event.reply(dad_joke)
@ldr.add("fact")
async def randomfact(event):
async with ldr.aioclient.get("https://uselessfacts.jsph.pl/random.json", params={"language": "en"}) as response:
if response.status == 200:
random_fact = (await response.json())["text"].replace("`", "'")
else:
await event.reply(f"An error occured: **{response.status}**")
return
await event.reply(random_fact)
async def shitpostify(text): async def shitpostify(text):
text = text.replace("dick", "peepee") text = text.replace("dick", "peepee")
text = text.replace("ck", "cc") text = text.replace("ck", "cc")

158
ubot/modules/scrapers.py Normal file
View File

@@ -0,0 +1,158 @@
# SPDX-License-Identifier: GPL-2.0-or-later
import io
import os
import re
import pafy
from gtts import gTTS
from howdoi import howdoi
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
os.environ["HOWDOI_SEARCH_ENGINE"] = "bing"
@ldr.add("dadjoke")
async def dadjoke(event):
async with ldr.aioclient.get("https://icanhazdadjoke.com/", headers={"Accept": "application/json"}) as response:
if response.status == 200:
dad_joke = (await response.json())["joke"]
else:
await event.reply(f"An error occured: **{response.status}**")
return
await event.reply(dad_joke)
@ldr.add("fact")
async def randomfact(event):
async with ldr.aioclient.get("https://uselessfacts.jsph.pl/random.json", params={"language": "en"}) as response:
if response.status == 200:
random_fact = (await response.json())["text"].replace("`", "'")
else:
await event.reply(f"An error occured: **{response.status}**")
return
await event.reply(random_fact)
@ldr.add("hdi")
async def howdoi_cmd(event):
if not event.args:
await event.reply(f"Syntax: {ldr.settings.get_config('cmd_prefix') or '.'}hdi <question>")
return
response = howdoi.howdoi(vars(howdoi.get_parser().parse_args(event.args.split(' '))))
response = re.sub(r'\n\n+', '\n\n', response).strip()
await event.reply(f"**Query:**\n{event.args}\n**Answer:**\n{response}")
@ldr.add("tts")
async def text_to_speech(event):
text, reply = await ldr.get_text(event, return_msg=True)
if not text:
await event.reply("Give me text or reply to text to use TTS.")
return
tts_bytesio = io.BytesIO()
tts_bytesio.name = "tts.mp3"
try:
tts = gTTS(text, lang="EN")
tts.write_to_fp(tts_bytesio)
tts_bytesio.seek(0)
except AssertionError:
await event.reply('The text is empty.\nNothing left to speak after pre-precessing, tokenizing and cleaning.')
return
except RuntimeError:
await event.reply('Error loading the languages dictionary.')
return
await event.client.send_file(event.chat_id, tts_bytesio, voice_note=True, reply_to=reply)
@ldr.add("ip")
async def ip_lookup(event):
ip = await ldr.get_text(event)
if not ip:
await event.reply("Provide an IP!")
return
async with ldr.aioclient.get(f"http://ip-api.com/json/{ip}") as response:
if response.status == 200:
lookup_json = await response.json()
else:
await event.reply(f"An error occurred when looking for **{ip}**: **{response.status}**")
return
fixed_lookup = {}
for key, value in lookup_json.items():
special = {"lat": "Latitude", "lon": "Longitude", "isp": "ISP", "as": "AS", "asname": "AS name"}
if key in special:
fixed_lookup[special[key]] = str(value)
continue
key = re.sub(r"([a-z])([A-Z])", r"\g<1> \g<2>", key)
key = key.capitalize()
if not value:
value = "None"
fixed_lookup[key] = str(value)
text = ""
for key, value in fixed_lookup.items():
text = text + f"**{key}:** {value}\n"
await event.reply(text)
@ldr.add("corona")
async def corona(event):
if event.args:
async with ldr.aioclient.get(f"https://corona.lmao.ninja/v2/countries/{event.args}") as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"An error occurred, response code: **{response.status}**")
return
response_text = f"Stats for **{response['country']}**\n\n**Cases:** {response['cases']} ({response['todayCases']} today)\n**Deaths:** {response['deaths']} ({response['todayDeaths']} today)\n**Recoveries:** {response['recovered']}"
await event.reply(response_text)
else:
async with ldr.aioclient.get(f"https://corona.lmao.ninja/v2/all") as response:
if response.status == 200:
response = await response.json()
else:
await event.reply(f"`An error occurred, response code: `**{response.status}**")
return
response_text = f"Global stats\n\n**Cases:** {response['cases']} ({response['todayCases']} today)\n**Deaths:** {response['deaths']} ({response['todayDeaths']} today)\n**Recoveries:** {response['recovered']}"
await event.reply(response_text)
@ldr.add("yt", userlocking=True)
async def youtube_cmd(event):
video = pafy.new(event.args)
video_stream = video.getbest()
try:
await event.reply(file=video_stream.url)
except:
await event.reply(f"Download failed: [URL]({video_stream.url})")
@ldr.add("yta", userlocking=True)
async def youtube_audio_cmd(event):
video = pafy.new(event.args)
video_stream = video.getbestaudio()
try:
await event.reply(file=video_stream.url)
except:
await event.reply(f"Download failed: [URL]({video_stream.url})")

View File

@@ -1,27 +0,0 @@
# SPDX-License-Identifier: GPL-2.0-or-later
import pafy
from ubot.micro_bot import micro_bot
ldr = micro_bot.loader
@ldr.add("yt", userlocking=True)
async def youtube_cmd(event):
video = pafy.new(event.args)
video_stream = video.getbest()
try:
await event.reply(file=video_stream.url)
except:
await event.reply(f"Download failed: [URL]({video_stream.url})")
@ldr.add("yta", userlocking=True)
async def youtube_audio_cmd(event):
video = pafy.new(event.args)
video_stream = video.getbestaudio()
try:
await event.reply(file=video_stream.url)
except:
await event.reply(f"Download failed: [URL]({video_stream.url})")