From 761a07bda884887aeda0c7203c4bd5ce5dcd1af4 Mon Sep 17 00:00:00 2001
From: Dan <14043624+delivrance@users.noreply.github.com>
Date: Mon, 17 Dec 2018 16:41:06 +0100
Subject: [PATCH] Remove unneeded parts in utils.py
---
pyrogram/client/ext/utils.py | 979 +----------------------------------
1 file changed, 1 insertion(+), 978 deletions(-)
diff --git a/pyrogram/client/ext/utils.py b/pyrogram/client/ext/utils.py
index fac611f3..3e67e3cd 100644
--- a/pyrogram/client/ext/utils.py
+++ b/pyrogram/client/ext/utils.py
@@ -16,233 +16,9 @@
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see .
-import logging
from base64 import b64decode, b64encode
-from struct import pack
-from pyrogram.client import types as pyrogram_types
-from ...api import types, functions
-from ...api.errors import StickersetInvalid, MessageIdsEmpty
-
-log = logging.getLogger(__name__)
-
-
-# TODO: Organize the code better?
-
-class Str(str):
- __slots__ = "_client", "_entities"
-
- def __init__(self, *args):
- super().__init__()
- self._client = None
- self._entities = None
-
- def init(self, client, entities):
- self._client = client
- self._entities = entities
-
- @property
- def text(self):
- return self
-
- @property
- def markdown(self):
- return self._client.markdown.unparse(self, self._entities)
-
- @property
- def html(self):
- return self._client.html.unparse(self, self._entities)
-
-
-ENTITIES = {
- types.MessageEntityMention.ID: "mention",
- types.MessageEntityHashtag.ID: "hashtag",
- types.MessageEntityCashtag.ID: "cashtag",
- types.MessageEntityBotCommand.ID: "bot_command",
- types.MessageEntityUrl.ID: "url",
- types.MessageEntityEmail.ID: "email",
- types.MessageEntityBold.ID: "bold",
- types.MessageEntityItalic.ID: "italic",
- types.MessageEntityCode.ID: "code",
- types.MessageEntityPre.ID: "pre",
- types.MessageEntityTextUrl.ID: "text_link",
- types.MessageEntityMentionName.ID: "text_mention",
- types.MessageEntityPhone.ID: "phone_number"
-}
-
-
-def parse_entities(entities: list, users: dict) -> list:
- output_entities = []
-
- for entity in entities:
- entity_type = ENTITIES.get(entity.ID, None)
-
- if entity_type:
- output_entities.append(
- pyrogram_types.MessageEntity(
- type=entity_type,
- offset=entity.offset,
- length=entity.length,
- url=getattr(entity, "url", None),
- user=parse_user(
- users.get(
- getattr(entity, "user_id", None),
- None
- )
- )
- )
- )
-
- return output_entities
-
-
-def parse_chat_photo(photo):
- if not isinstance(photo, (types.UserProfilePhoto, types.ChatPhoto)):
- return None
-
- if not isinstance(photo.photo_small, types.FileLocation):
- return None
-
- if not isinstance(photo.photo_big, types.FileLocation):
- return None
-
- photo_id = getattr(photo, "photo_id", 0)
- loc_small = photo.photo_small
- loc_big = photo.photo_big
-
- return pyrogram_types.ChatPhoto(
- small_file_id=encode(
- pack(
- " pyrogram_types.UserStatus or None:
- if is_bot:
- return None
-
- status = pyrogram_types.UserStatus(user_id)
-
- if isinstance(user_status, types.UserStatusOnline):
- status.online = True
- status.date = user_status.expires
- elif isinstance(user_status, types.UserStatusOffline):
- status.offline = True
- status.date = user_status.was_online
- elif isinstance(user_status, types.UserStatusRecently):
- status.recently = True
- elif isinstance(user_status, types.UserStatusLastWeek):
- status.within_week = True
- elif isinstance(user_status, types.UserStatusLastMonth):
- status.within_month = True
- else:
- status.long_time_ago = True
-
- return status
-
-
-def parse_user(user: types.User) -> pyrogram_types.User or None:
- return pyrogram_types.User(
- id=user.id,
- is_self=user.is_self,
- is_contact=user.contact,
- is_mutual_contact=user.mutual_contact,
- is_deleted=user.deleted,
- is_bot=user.bot,
- first_name=user.first_name,
- last_name=user.last_name,
- username=user.username,
- language_code=user.lang_code,
- phone_number=user.phone,
- photo=parse_chat_photo(user.photo),
- status=parse_user_status(user.status, is_bot=user.bot),
- restriction_reason=user.restriction_reason
- ) if user else None
-
-
-def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat:
- if isinstance(message.to_id, types.PeerUser):
- return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id])
- elif isinstance(message.to_id, types.PeerChat):
- return parse_chat_chat(chats[message.to_id.chat_id])
- else:
- return parse_channel_chat(chats[message.to_id.channel_id])
-
-
-def parse_user_chat(user: types.User) -> pyrogram_types.Chat:
- return pyrogram_types.Chat(
- id=user.id,
- type="private",
- username=user.username,
- first_name=user.first_name,
- last_name=user.last_name,
- photo=parse_chat_photo(user.photo),
- restriction_reason=user.restriction_reason
- )
-
-
-def parse_chat_chat(chat: types.Chat) -> pyrogram_types.Chat:
- admins_enabled = getattr(chat, "admins_enabled", None)
-
- if admins_enabled is not None:
- admins_enabled = not admins_enabled
-
- return pyrogram_types.Chat(
- id=-chat.id,
- type="group",
- title=chat.title,
- all_members_are_administrators=admins_enabled,
- photo=parse_chat_photo(getattr(chat, "photo", None))
- )
-
-
-def parse_channel_chat(channel: types.Channel) -> pyrogram_types.Chat:
- return pyrogram_types.Chat(
- id=int("-100" + str(channel.id)),
- type="supergroup" if channel.megagroup else "channel",
- title=channel.title,
- username=getattr(channel, "username", None),
- photo=parse_chat_photo(getattr(channel, "photo", None)),
- restriction_reason=getattr(channel, "restriction_reason", None)
- )
-
-
-def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram_types.PhotoSize or None:
- if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)):
- loc = thumb.location
-
- if isinstance(thumb, types.PhotoSize):
- file_size = thumb.size
- else:
- file_size = len(thumb.bytes)
-
- if isinstance(loc, types.FileLocation):
- return pyrogram_types.PhotoSize(
- file_id=encode(
- pack(
- " bytes:
@@ -281,507 +57,6 @@ def encode(s: bytes) -> str:
return b64encode(r, b"-_").decode().rstrip("=")
-# TODO: Reorganize code, maybe split parts as well
-def parse_messages(
- client,
- messages: list or types.Message or types.MessageService or types.MessageEmpty,
- users: dict,
- chats: dict,
- replies: int = 1
-) -> pyrogram_types.Message or list:
- is_list = isinstance(messages, list)
- messages = messages if is_list else [messages]
- parsed_messages = []
-
- for message in messages:
- if isinstance(message, types.Message):
- entities = parse_entities(message.entities, users)
-
- forward_from = None
- forward_from_chat = None
- forward_from_message_id = None
- forward_signature = None
- forward_date = None
-
- forward_header = message.fwd_from # type: types.MessageFwdHeader
-
- if forward_header:
- forward_date = forward_header.date
-
- if forward_header.from_id:
- forward_from = parse_user(users[forward_header.from_id])
- else:
- forward_from_chat = parse_channel_chat(chats[forward_header.channel_id])
- forward_from_message_id = forward_header.channel_post
- forward_signature = forward_header.post_author
-
- photo = None
- location = None
- contact = None
- venue = None
- audio = None
- voice = None
- animation = None
- video = None
- video_note = None
- sticker = None
- document = None
- web_page = None
-
- media = message.media
-
- if media:
- if isinstance(media, types.MessageMediaPhoto):
- photo = media.photo
-
- if isinstance(photo, types.Photo):
- sizes = photo.sizes
- photo_sizes = []
-
- for size in sizes:
- if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)):
- loc = size.location
-
- if isinstance(size, types.PhotoSize):
- file_size = size.size
- else:
- file_size = len(size.bytes)
-
- if isinstance(loc, types.FileLocation):
- photo_size = pyrogram_types.PhotoSize(
- file_id=encode(
- pack(
- " pyrogram_types.Messages:
- messages = update.messages
- channel_id = getattr(update, "channel_id", None)
-
- parsed_messages = []
-
- for message in messages:
- parsed_messages.append(
- pyrogram_types.Message(
- message_id=message,
- chat=(pyrogram_types.Chat(id=int("-100" + str(channel_id)), type="channel")
- if channel_id is not None
- else None)
- )
- )
-
- return pyrogram_types.Messages(len(parsed_messages), parsed_messages)
-
-
def get_peer_id(input_peer) -> int:
return (
input_peer.user_id if isinstance(input_peer, types.InputPeerUser)
@@ -807,255 +82,3 @@ def get_offset_date(dialogs):
return m.date
else:
return 0
-
-
-def parse_profile_photos(photos):
- if isinstance(photos, types.photos.Photos):
- total_count = len(photos.photos)
- else:
- total_count = photos.count
-
- user_profile_photos = []
-
- for photo in photos.photos:
- if isinstance(photo, types.Photo):
- sizes = photo.sizes
- photo_sizes = []
-
- for size in sizes:
- if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)):
- loc = size.location
-
- if isinstance(size, types.PhotoSize):
- file_size = size.size
- else:
- file_size = len(size.bytes)
-
- if isinstance(loc, types.FileLocation):
- photo_size = pyrogram_types.PhotoSize(
- file_id=encode(
- pack(
- " pyrogram_types.Chat:
- if isinstance(chat_full, types.UserFull):
- parsed_chat = parse_user_chat(chat_full.user)
- parsed_chat.description = chat_full.about
- else:
- full_chat = chat_full.full_chat
- chat = None
-
- for i in chat_full.chats:
- if full_chat.id == i.id:
- chat = i
-
- if isinstance(full_chat, types.ChatFull):
- parsed_chat = parse_chat_chat(chat)
-
- if isinstance(full_chat.participants, types.ChatParticipants):
- parsed_chat.members_count = len(full_chat.participants.participants)
- else:
- parsed_chat = parse_channel_chat(chat)
- parsed_chat.members_count = full_chat.participants_count
- parsed_chat.description = full_chat.about or None
- # TODO: Add StickerSet type
- parsed_chat.can_set_sticker_set = full_chat.can_set_stickers
- parsed_chat.sticker_set_name = full_chat.stickerset
-
- if full_chat.pinned_msg_id:
- parsed_chat.pinned_message = client.get_messages(
- parsed_chat.id,
- message_ids=full_chat.pinned_msg_id
- )
-
- if isinstance(full_chat.exported_invite, types.ChatInviteExported):
- parsed_chat.invite_link = full_chat.exported_invite.link
-
- return parsed_chat
-
-
-def parse_dialog_chat(peer, users: dict, chats: dict):
- if isinstance(peer, types.PeerUser):
- return parse_user_chat(users[peer.user_id])
- elif isinstance(peer, types.PeerChat):
- return parse_chat_chat(chats[peer.chat_id])
- else:
- return parse_channel_chat(chats[peer.channel_id])
-
-
-def parse_chat_members(members: types.channels.ChannelParticipants or types.messages.ChatFull):
- users = {i.id: i for i in members.users}
- parsed_members = []
-
- if isinstance(members, types.channels.ChannelParticipants):
- count = members.count
- members = members.participants
-
- for member in members:
- user = parse_user(users[member.user_id])
-
- if isinstance(member, (types.ChannelParticipant, types.ChannelParticipantSelf)):
- parsed_members.append(
- pyrogram_types.ChatMember(
- user=user,
- status="member"
- )
- )
- elif isinstance(member, types.ChannelParticipantCreator):
- parsed_members.append(
- pyrogram_types.ChatMember(
- user=user,
- status="creator"
- )
- )
- elif isinstance(member, types.ChannelParticipantAdmin):
- rights = member.admin_rights # type: types.ChannelAdminRights
-
- parsed_members.append(
- pyrogram_types.ChatMember(
- user=user,
- status="administrator",
- can_be_edited=member.can_edit,
- can_change_info=rights.change_info,
- can_post_messages=rights.post_messages,
- can_edit_messages=rights.edit_messages,
- can_delete_messages=rights.delete_messages,
- can_invite_users=rights.invite_users or rights.invite_link,
- can_restrict_members=rights.ban_users,
- can_pin_messages=rights.pin_messages,
- can_promote_members=rights.add_admins
- )
- )
- elif isinstance(member, types.ChannelParticipantBanned):
- rights = member.banned_rights # type: types.ChannelBannedRights
-
- chat_member = pyrogram_types.ChatMember(
- user=user,
- status="kicked" if rights.view_messages else "restricted",
- until_date=0 if rights.until_date == (1 << 31) - 1 else rights.until_date
- )
-
- if chat_member.status == "restricted":
- chat_member.can_send_messages = not rights.send_messages
- chat_member.can_send_media_messages = not rights.send_media
- chat_member.can_send_other_messages = (
- not rights.send_stickers or not rights.send_gifs or
- not rights.send_games or not rights.send_inline
- )
- chat_member.can_add_web_page_previews = not rights.embed_links
-
- parsed_members.append(chat_member)
-
- return pyrogram_types.ChatMembers(
- total_count=count,
- chat_members=parsed_members
- )
- else:
- members = members.full_chat.participants.participants
-
- for member in members:
- user = parse_user(users[member.user_id])
-
- if isinstance(member, types.ChatParticipant):
- parsed_members.append(
- pyrogram_types.ChatMember(
- user=user,
- status="member"
- )
- )
- elif isinstance(member, types.ChatParticipantCreator):
- parsed_members.append(
- pyrogram_types.ChatMember(
- user=user,
- status="creator"
- )
- )
- elif isinstance(member, types.ChatParticipantAdmin):
- parsed_members.append(
- pyrogram_types.ChatMember(
- user=user,
- status="administrator"
- )
- )
-
- return pyrogram_types.ChatMembers(
- total_count=len(members),
- chat_members=parsed_members
- )