2
0
mirror of https://github.com/pyrogram/pyrogram synced 2025-08-23 10:28:00 +00:00

Add a watchdog for incoming updates

This commit is contained in:
Dan 2022-12-23 15:40:56 +01:00
parent aeea07f83d
commit a9e7d15bf6
3 changed files with 35 additions and 0 deletions

View File

@ -26,6 +26,7 @@ import re
import shutil import shutil
import sys import sys
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from datetime import datetime, timedelta
from hashlib import sha256 from hashlib import sha256
from importlib import import_module from importlib import import_module
from io import StringIO, BytesIO from io import StringIO, BytesIO
@ -185,6 +186,9 @@ class Client(Methods):
WORKERS = min(32, (os.cpu_count() or 0) + 4) # os.cpu_count() can be None WORKERS = min(32, (os.cpu_count() or 0) + 4) # os.cpu_count() can be None
WORKDIR = PARENT_DIR WORKDIR = PARENT_DIR
# Interval of seconds in which the updates watchdog will kick in
UPDATES_WATCHDOG_INTERVAL = 5 * 60
mimetypes = MimeTypes() mimetypes = MimeTypes()
mimetypes.readfp(StringIO(mime_types)) mimetypes.readfp(StringIO(mime_types))
@ -273,6 +277,13 @@ class Client(Methods):
self.message_cache = Cache(10000) self.message_cache = Cache(10000)
# Sometimes, for some reason, the server will stop sending updates and will only respond to pings.
# This watchdog will invoke updates.GetState in order to wake up the server and enable it sending updates again
# after some idle time has been detected.
self.updates_watchdog_task = None
self.updates_watchdog_event = asyncio.Event()
self.last_update_time = datetime.now()
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
def __enter__(self): def __enter__(self):
@ -293,6 +304,18 @@ class Client(Methods):
except ConnectionError: except ConnectionError:
pass pass
async def updates_watchdog(self):
while True:
try:
await asyncio.wait_for(self.updates_watchdog_event.wait(), self.UPDATES_WATCHDOG_INTERVAL)
except asyncio.TimeoutError:
pass
else:
break
if datetime.now() - self.last_update_time > timedelta(seconds=self.UPDATES_WATCHDOG_INTERVAL):
await self.invoke(raw.functions.updates.GetState())
async def authorize(self) -> User: async def authorize(self) -> User:
if self.bot_token: if self.bot_token:
return await self.sign_in_bot(self.bot_token) return await self.sign_in_bot(self.bot_token)
@ -484,6 +507,8 @@ class Client(Methods):
return is_min return is_min
async def handle_updates(self, updates): async def handle_updates(self, updates):
self.last_update_time = datetime.now()
if isinstance(updates, (raw.types.Updates, raw.types.UpdatesCombined)): if isinstance(updates, (raw.types.Updates, raw.types.UpdatesCombined)):
is_min = any(( is_min = any((
await self.fetch_peers(updates.users), await self.fetch_peers(updates.users),

View File

@ -16,6 +16,7 @@
# You should have received a copy of the GNU Lesser General Public License # You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>. # along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging import logging
import pyrogram import pyrogram
@ -46,4 +47,6 @@ class Initialize:
await self.dispatcher.start() await self.dispatcher.start()
self.updates_watchdog_task = asyncio.create_task(self.updates_watchdog())
self.is_initialized = True self.is_initialized = True

View File

@ -51,4 +51,11 @@ class Terminate:
self.media_sessions.clear() self.media_sessions.clear()
self.updates_watchdog_event.set()
if self.updates_watchdog_task is not None:
await self.updates_watchdog_task
self.updates_watchdog_event.clear()
self.is_initialized = False self.is_initialized = False