diff --git a/pyrogram/client/message_parser.py b/pyrogram/client/message_parser.py index 8890ee1a..39512414 100644 --- a/pyrogram/client/message_parser.py +++ b/pyrogram/client/message_parser.py @@ -1,552 +1 @@ -from base64 import b64encode, b64decode -from struct import pack - -import pyrogram -from pyrogram.api import types - -ENTITIES = { - types.MessageEntityMention.ID: "mention", - types.MessageEntityHashtag.ID: "hashtag", - types.MessageEntityBotCommand.ID: "bot_command", - types.MessageEntityUrl.ID: "url", - types.MessageEntityEmail.ID: "email", - types.MessageEntityBold.ID: "bold", - types.MessageEntityItalic.ID: "italic", - types.MessageEntityCode.ID: "code", - types.MessageEntityPre.ID: "pre", - types.MessageEntityTextUrl.ID: "text_link", - types.MessageEntityMentionName.ID: "text_mention" -} - - -def parse_entities(entities: list, users: dict) -> list: - output_entities = [] - - for entity in entities: - entity_type = ENTITIES.get(entity.ID, None) - - if entity_type: - output_entities.append(pyrogram.MessageEntity( - type=entity_type, - offset=entity.offset, - length=entity.length, - url=getattr(entity, "url", None), - user=parse_user( - users.get( - getattr(entity, "user_id", None), - None - ) - ) - )) - - return output_entities - - -def parse_user(user: types.User) -> pyrogram.User or None: - return pyrogram.User( - id=user.id, - is_bot=user.bot, - first_name=user.first_name, - last_name=user.last_name, - username=user.username, - language_code=user.lang_code - ) if user else None - - -def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram.Chat: - if isinstance(message.to_id, types.PeerUser): - return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) - elif isinstance(message.to_id, types.PeerChat): - return parse_chat_chat(chats[message.to_id.chat_id]) - else: - return parse_channel_chat(chats[message.to_id.channel_id]) - - -def parse_user_chat(user: types.User) -> pyrogram.Chat: - return pyrogram.Chat( - id=user.id, - type="private", - username=user.username, - first_name=user.first_name, - last_name=user.last_name - ) - - -def parse_chat_chat(chat: types.Chat) -> pyrogram.Chat: - return pyrogram.Chat( - id=-chat.id, - type="group", - title=chat.title, - all_members_are_administrators=not chat.admins_enabled - ) - - -def parse_channel_chat(channel: types.Channel) -> pyrogram.Chat: - return pyrogram.Chat( - id=int("-100" + str(channel.id)), - type="supergroup" if channel.megagroup else "channel", - title=channel.title, - username=channel.username - ) - - -def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram.PhotoSize or None: - if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): - loc = thumb.location - - if isinstance(thumb, types.PhotoSize): - file_size = thumb.size - else: - file_size = len(thumb.bytes) - - if isinstance(loc, types.FileLocation): - return pyrogram.PhotoSize( - file_id=encode( - pack( - " pyrogram.Message: - entities = parse_entities(message.entities, users) - - forward_from = None - forward_from_chat = None - forward_from_message_id = None - forward_signature = None - forward_date = None - - forward_header = message.fwd_from # type: types.MessageFwdHeader - - if forward_header: - forward_date = forward_header.date - - if forward_header.from_id: - forward_from = parse_user(users[forward_header.from_id]) - else: - forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) - forward_from_message_id = forward_header.channel_post - forward_signature = forward_header.post_author - - photo = None - location = None - contact = None - venue = None - audio = None - voice = None - video = None - video_note = None - sticker = None - document = None - - media = message.media - - if media: - if isinstance(media, types.MessageMediaPhoto): - photo = media.photo - - if isinstance(photo, types.Photo): - sizes = photo.sizes - photo_sizes = [] - - for size in sizes: - if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): - loc = size.location - - if isinstance(size, types.PhotoSize): - file_size = size.size - else: - file_size = len(size.bytes) - - if isinstance(loc, types.FileLocation): - photo_size = pyrogram.PhotoSize( - file_id=encode( - pack( - " pyrogram.Message: - action = message.action - - new_chat_members = None - left_chat_member = None - new_chat_title = None - delete_chat_photo = None - migrate_to_chat_id = None - migrate_from_chat_id = None - group_chat_created = None - channel_chat_created = None - new_chat_photo = None - - if isinstance(action, types.MessageActionChatAddUser): - new_chat_members = [parse_user(users[i]) for i in action.users] - elif isinstance(action, types.MessageActionChatJoinedByLink): - new_chat_members = [parse_user(users[action.inviter_id])] - elif isinstance(action, types.MessageActionChatDeleteUser): - left_chat_member = parse_user(users[action.user_id]) - elif isinstance(action, types.MessageActionChatEditTitle): - new_chat_title = action.title - elif isinstance(action, types.MessageActionChatDeletePhoto): - delete_chat_photo = True - elif isinstance(action, types.MessageActionChatMigrateTo): - migrate_to_chat_id = action.channel_id - elif isinstance(action, types.MessageActionChannelMigrateFrom): - migrate_from_chat_id = action.chat_id - elif isinstance(action, types.MessageActionChatCreate): - group_chat_created = True - elif isinstance(action, types.MessageActionChannelCreate): - channel_chat_created = True - elif isinstance(action, types.MessageActionChatEditPhoto): - photo = action.photo - - if isinstance(photo, types.Photo): - sizes = photo.sizes - photo_sizes = [] - - for size in sizes: - if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): - loc = size.location - - if isinstance(size, types.PhotoSize): - file_size = size.size - else: - file_size = len(size.bytes) - - if isinstance(loc, types.FileLocation): - photo_size = pyrogram.PhotoSize( - file_id=encode( - pack( - " bytes: - s = b64decode(s + "=" * (-len(s) % 4), "-_") - r = b"" - - assert s[-1] == 2 - - i = 0 - while i < len(s) - 1: - if s[i] != 0: - r += bytes([s[i]]) - else: - r += b"\x00" * s[i + 1] - i += 1 - - i += 1 - - return r - - -def encode(s: bytes) -> str: - r = b"" - n = 0 - - for i in s + bytes([2]): - if i == 0: - n += 1 - else: - if n: - r += b"\x00" + bytes([n]) - n = 0 - - r += bytes([i]) - - return b64encode(r, b"-_").decode().rstrip("=") +from base64 import b64encode, b64decode from struct import pack import pyrogram from pyrogram.api import types # TODO: Organize the code better? ENTITIES = { types.MessageEntityMention.ID: "mention", types.MessageEntityHashtag.ID: "hashtag", types.MessageEntityBotCommand.ID: "bot_command", types.MessageEntityUrl.ID: "url", types.MessageEntityEmail.ID: "email", types.MessageEntityBold.ID: "bold", types.MessageEntityItalic.ID: "italic", types.MessageEntityCode.ID: "code", types.MessageEntityPre.ID: "pre", types.MessageEntityTextUrl.ID: "text_link", types.MessageEntityMentionName.ID: "text_mention" } def parse_entities(entities: list, users: dict) -> list: output_entities = [] for entity in entities: entity_type = ENTITIES.get(entity.ID, None) if entity_type: output_entities.append(pyrogram.MessageEntity( type=entity_type, offset=entity.offset, length=entity.length, url=getattr(entity, "url", None), user=parse_user( users.get( getattr(entity, "user_id", None), None ) ) )) return output_entities def parse_user(user: types.User) -> pyrogram.User or None: return pyrogram.User( id=user.id, is_bot=user.bot, first_name=user.first_name, last_name=user.last_name, username=user.username, language_code=user.lang_code ) if user else None def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram.Chat: if isinstance(message.to_id, types.PeerUser): return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) elif isinstance(message.to_id, types.PeerChat): return parse_chat_chat(chats[message.to_id.chat_id]) else: return parse_channel_chat(chats[message.to_id.channel_id]) def parse_user_chat(user: types.User) -> pyrogram.Chat: return pyrogram.Chat( id=user.id, type="private", username=user.username, first_name=user.first_name, last_name=user.last_name ) def parse_chat_chat(chat: types.Chat) -> pyrogram.Chat: return pyrogram.Chat( id=-chat.id, type="group", title=chat.title, all_members_are_administrators=not chat.admins_enabled ) def parse_channel_chat(channel: types.Channel) -> pyrogram.Chat: return pyrogram.Chat( id=int("-100" + str(channel.id)), type="supergroup" if channel.megagroup else "channel", title=channel.title, username=channel.username ) def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram.PhotoSize or None: if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): loc = thumb.location if isinstance(thumb, types.PhotoSize): file_size = thumb.size else: file_size = len(thumb.bytes) if isinstance(loc, types.FileLocation): return pyrogram.PhotoSize( file_id=encode( pack( " pyrogram.Message: entities = parse_entities(message.entities, users) forward_from = None forward_from_chat = None forward_from_message_id = None forward_signature = None forward_date = None forward_header = message.fwd_from # type: types.MessageFwdHeader if forward_header: forward_date = forward_header.date if forward_header.from_id: forward_from = parse_user(users[forward_header.from_id]) else: forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) forward_from_message_id = forward_header.channel_post forward_signature = forward_header.post_author photo = None location = None contact = None venue = None audio = None voice = None video = None video_note = None sticker = None document = None media = message.media if media: if isinstance(media, types.MessageMediaPhoto): photo = media.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram.PhotoSize( file_id=encode( pack( " pyrogram.Message: action = message.action new_chat_members = None left_chat_member = None new_chat_title = None delete_chat_photo = None migrate_to_chat_id = None migrate_from_chat_id = None group_chat_created = None channel_chat_created = None new_chat_photo = None if isinstance(action, types.MessageActionChatAddUser): new_chat_members = [parse_user(users[i]) for i in action.users] elif isinstance(action, types.MessageActionChatJoinedByLink): new_chat_members = [parse_user(users[action.inviter_id])] elif isinstance(action, types.MessageActionChatDeleteUser): left_chat_member = parse_user(users[action.user_id]) elif isinstance(action, types.MessageActionChatEditTitle): new_chat_title = action.title elif isinstance(action, types.MessageActionChatDeletePhoto): delete_chat_photo = True elif isinstance(action, types.MessageActionChatMigrateTo): migrate_to_chat_id = action.channel_id elif isinstance(action, types.MessageActionChannelMigrateFrom): migrate_from_chat_id = action.chat_id elif isinstance(action, types.MessageActionChatCreate): group_chat_created = True elif isinstance(action, types.MessageActionChannelCreate): channel_chat_created = True elif isinstance(action, types.MessageActionChatEditPhoto): photo = action.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram.PhotoSize( file_id=encode( pack( " bytes: s = b64decode(s + "=" * (-len(s) % 4), "-_") r = b"" assert s[-1] == 2 i = 0 while i < len(s) - 1: if s[i] != 0: r += bytes([s[i]]) else: r += b"\x00" * s[i + 1] i += 1 i += 1 return r def encode(s: bytes) -> str: r = b"" n = 0 for i in s + bytes([2]): if i == 0: n += 1 else: if n: r += b"\x00" + bytes([n]) n = 0 r += bytes([i]) return b64encode(r, b"-_").decode().rstrip("=") \ No newline at end of file