diff --git a/pyrogram/connection/connection.py b/pyrogram/connection/connection.py index 618c92a5..73c2312f 100644 --- a/pyrogram/connection/connection.py +++ b/pyrogram/connection/connection.py @@ -57,7 +57,7 @@ class Connection: await self.protocol.connect(self.address) except OSError as e: log.warning(f"Unable to connect due to network issues: {e}") - self.protocol.close() + await self.protocol.close() await asyncio.sleep(1) else: log.info("Connected! {} DC{}{} - IPv{} - {}".format( @@ -72,8 +72,8 @@ class Connection: log.warning("Connection failed! Trying again...") raise TimeoutError - def close(self): - self.protocol.close() + async def close(self): + await self.protocol.close() log.info("Disconnected") async def send(self, data: bytes): diff --git a/pyrogram/connection/transport/tcp/tcp.py b/pyrogram/connection/transport/tcp/tcp.py index c0efb625..4524c7a8 100644 --- a/pyrogram/connection/transport/tcp/tcp.py +++ b/pyrogram/connection/transport/tcp/tcp.py @@ -21,7 +21,7 @@ import ipaddress import logging import socket import time -from concurrent.futures import ThreadPoolExecutor +from typing import Optional try: import socks @@ -76,17 +76,21 @@ class TCP: else socket.AF_INET ) + self.socket.setblocking(False) self.socket.settimeout(TCP.TIMEOUT) + self.send_queue = asyncio.Queue() + self.send_task = None + async def connect(self, address: tuple): - # The socket used by the whole logic is blocking and thus it blocks when connecting. - # Offload the task to a thread executor to avoid blocking the main event loop. - with ThreadPoolExecutor(1) as executor: - await self.loop.run_in_executor(executor, self.socket.connect, address) - + await asyncio.get_event_loop().sock_connect(self.socket, address) self.reader, self.writer = await asyncio.open_connection(sock=self.socket) + self.send_task = asyncio.create_task(self.send_worker()) + + async def close(self): + await self.send_queue.put(None) + await self.send_task - def close(self): try: self.writer.close() except AttributeError: @@ -100,8 +104,16 @@ class TCP: time.sleep(0.001) self.socket.close() - async def send(self, data: bytes): - async with self.lock: + async def send(self, data: Optional[bytes]): + await self.send_queue.put(data) + + async def send_worker(self): + while True: + data = await self.send_queue.get() + + if data is None: + break + self.writer.write(data) await self.writer.drain() diff --git a/pyrogram/session/auth.py b/pyrogram/session/auth.py index 7df4fede..0bb39398 100644 --- a/pyrogram/session/auth.py +++ b/pyrogram/session/auth.py @@ -258,4 +258,4 @@ class Auth: else: return auth_key finally: - self.connection.close() + await self.connection.close() diff --git a/pyrogram/session/session.py b/pyrogram/session/session.py index a6537bb9..5e5d93fa 100644 --- a/pyrogram/session/session.py +++ b/pyrogram/session/session.py @@ -157,7 +157,7 @@ class Session: self.ping_task_event.clear() - self.connection.close() + await self.connection.close() if self.network_task: await self.network_task