2
0
mirror of https://github.com/pyrogram/pyrogram synced 2025-08-29 21:38:04 +00:00

Rewrite add_contacts and delete_contacts

This commit is contained in:
Dan 2018-02-20 15:03:35 +01:00
parent 2ab8fbe047
commit 3ff3f5d9f4

View File

@ -45,17 +45,14 @@ from pyrogram.api.errors import (
from pyrogram.api.types import (
User, Chat, Channel,
PeerUser, PeerChannel,
InputUser,
InputPeerEmpty, InputPeerSelf,
InputPeerUser, InputPeerChat, InputPeerChannel,
InputPhoneContact
InputPeerUser, InputPeerChat, InputPeerChannel
)
from pyrogram.crypto import AES
from pyrogram.session import Auth, Session
from pyrogram.session.internals import MsgId
from .input_media import InputMedia
from .style import Markdown, HTML
from typing import List, Union
log = logging.getLogger(__name__)
@ -2310,99 +2307,34 @@ class Client:
done.wait()
def add_contacts(self,
phone: Union[int, str] = None,
first_name: str = None,
last_name: str = None,
input_phone_contact_list: List[InputPhoneContact] = None):
if (phone is None or first_name is None) and \
input_phone_contact_list is None:
log.warning("(phone and first_name) or input_phone_contact_list "
"must be not None")
return None
if phone is not None and first_name is not None:
if str(phone)[0] != '+':
phone = '+' + str(phone)
input_phone_contact_list = []
input_phone_contact = InputPhoneContact(client_id=0,
phone=phone,
first_name=first_name,
last_name=last_name or '')
input_phone_contact_list.append(input_phone_contact)
# make sure that we send only InputPhoneContact
inner_input_phone_contact_list = []
for contact in input_phone_contact_list:
if isinstance(contact, InputPhoneContact):
inner_input_phone_contact_list.append(contact)
def add_contacts(self, contacts: list):
imported_contacts = self.send(
functions.contacts.ImportContacts(inner_input_phone_contact_list))
functions.contacts.ImportContacts(
contacts=contacts
)
)
for user in imported_contacts.users:
if isinstance(user, User):
if user.id in self.peers_by_id:
continue
if user.access_hash is None:
continue
input_peer = InputPeerUser(
user_id=user.id,
access_hash=user.access_hash
)
self.peers_by_id[user.id] = input_peer
if user.username is not None:
self.peers_by_username[user.username] = input_peer
self.fetch_peers(imported_contacts.users)
return imported_contacts
def delete_contacts(self, _id: int = None,
ids_list: Union[
List[int], List[InputUser]] = None):
if _id is None and ids_list is None:
log.warning('id or ids_list must be not None')
return False
def delete_contacts(self, ids: list):
contacts = []
contacts = self.get_contacts()
for i in ids:
try:
input_user = self.resolve_peer(i)
except PeerIdInvalid:
continue
else:
if isinstance(input_user, types.InputPeerUser):
contacts.append(input_user)
if _id is not None:
if not isinstance(_id, int):
log.warning('id is not int')
return False
input_user = None
for user in contacts.users:
if isinstance(user, User):
if _id == user.id:
input_user = InputUser(user_id=user.id,
access_hash=user.access_hash)
break
ids_list = [input_user]
inner_ids_list = []
for _id in ids_list:
if isinstance(_id, InputUser):
inner_ids_list.append(_id)
if isinstance(_id, int):
input_user = None
for user in contacts.users:
if isinstance(user, User):
if _id == user.id:
input_user = InputUser(
user_id=user.id,
access_hash=user.access_hash)
break
inner_ids_list.append(input_user)
res = self.send(functions.contacts.DeleteContacts(inner_ids_list))
return res
return self.send(
functions.contacts.DeleteContacts(
id=contacts
)
)
def get_contacts(self, _hash: int = 0):
return self.send(functions.contacts.GetContacts(_hash))