2
0
mirror of https://github.com/pyrogram/pyrogram synced 2025-08-29 13:27:47 +00:00

Move the automatic sleep mechanism down to Session

This commit is contained in:
Dan 2020-05-07 13:38:22 +02:00
parent 99aee987bd
commit 128ab4b0b9
2 changed files with 41 additions and 33 deletions

View File

@ -185,7 +185,7 @@ class Client(Methods, BaseClient):
plugins: dict = None, plugins: dict = None,
no_updates: bool = None, no_updates: bool = None,
takeout: bool = None, takeout: bool = None,
sleep_threshold: int = 60 sleep_threshold: int = Session.SLEEP_THRESHOLD
): ):
super().__init__() super().__init__()
@ -1409,31 +1409,13 @@ class Client(Methods, BaseClient):
if not self.is_connected: if not self.is_connected:
raise ConnectionError("Client has not been started yet") raise ConnectionError("Client has not been started yet")
# Some raw methods that expect a query as argument are used here.
# Keep the original request query because is needed.
unwrapped_data = data
if self.no_updates: if self.no_updates:
data = functions.InvokeWithoutUpdates(query=data) data = functions.InvokeWithoutUpdates(query=data)
if self.takeout_id: if self.takeout_id:
data = functions.InvokeWithTakeout(takeout_id=self.takeout_id, query=data) data = functions.InvokeWithTakeout(takeout_id=self.takeout_id, query=data)
while True: r = self.session.send(data, retries, timeout, self.sleep_threshold)
try:
r = self.session.send(data, retries, timeout)
except FloodWait as e:
amount = e.x
if amount > self.sleep_threshold:
raise
log.warning('[{}] Sleeping for {}s (required by "{}")'.format(
self.session_name, amount, ".".join(unwrapped_data.QUALNAME.split(".")[1:])))
time.sleep(amount)
else:
break
self.fetch_peers(getattr(r, "users", [])) self.fetch_peers(getattr(r, "users", []))
self.fetch_peers(getattr(r, "chats", [])) self.fetch_peers(getattr(r, "chats", []))

View File

@ -33,7 +33,7 @@ from pyrogram.api.all import layer
from pyrogram.api.core import Message, TLObject, MsgContainer, Long, FutureSalt, Int from pyrogram.api.core import Message, TLObject, MsgContainer, Long, FutureSalt, Int
from pyrogram.connection import Connection from pyrogram.connection import Connection
from pyrogram.crypto import AES, KDF from pyrogram.crypto import AES, KDF
from pyrogram.errors import RPCError, InternalServerError, AuthKeyDuplicated from pyrogram.errors import RPCError, InternalServerError, AuthKeyDuplicated, FloodWait
from .internals import MsgId, MsgFactory from .internals import MsgId, MsgFactory
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -50,6 +50,7 @@ class Session:
NET_WORKERS = 1 NET_WORKERS = 1
START_TIMEOUT = 1 START_TIMEOUT = 1
WAIT_TIMEOUT = 15 WAIT_TIMEOUT = 15
SLEEP_THRESHOLD = 60
MAX_RETRIES = 5 MAX_RETRIES = 5
ACKS_THRESHOLD = 8 ACKS_THRESHOLD = 8
PING_INTERVAL = 5 PING_INTERVAL = 5
@ -432,19 +433,44 @@ class Session:
else: else:
return result return result
def send(self, data: TLObject, retries: int = MAX_RETRIES, timeout: float = WAIT_TIMEOUT): def send(
self,
data: TLObject,
retries: int = MAX_RETRIES,
timeout: float = WAIT_TIMEOUT,
sleep_threshold: float = SLEEP_THRESHOLD
):
self.is_connected.wait(self.WAIT_TIMEOUT) self.is_connected.wait(self.WAIT_TIMEOUT)
if isinstance(data, (functions.InvokeWithoutUpdates, functions.InvokeWithTakeout)):
query = data.query
else:
query = data
query = ".".join(query.QUALNAME.split(".")[1:])
while True:
try: try:
return self._send(data, timeout=timeout) return self._send(data, timeout=timeout)
except FloodWait as e:
amount = e.x
if amount > sleep_threshold:
raise
log.warning('[{}] Sleeping for {}s (required by "{}")'.format(
self.client.session_name, amount, query))
time.sleep(amount)
except (OSError, TimeoutError, InternalServerError) as e: except (OSError, TimeoutError, InternalServerError) as e:
if retries == 0: if retries == 0:
raise e from None raise e from None
(log.warning if retries < 2 else log.info)( (log.warning if retries < 2 else log.info)(
"[{}] Retrying {} due to {}".format( '[{}] Retrying "{}" due to {}'.format(
Session.MAX_RETRIES - retries + 1, Session.MAX_RETRIES - retries + 1,
data.QUALNAME, e)) query, e))
time.sleep(0.5) time.sleep(0.5)
return self.send(data, retries - 1, timeout) return self.send(data, retries - 1, timeout)