From d8edfb38bf8ada12e24633b37168431600a7fa9c Mon Sep 17 00:00:00 2001 From: Dan <14043624+delivrance@users.noreply.github.com> Date: Thu, 8 Feb 2018 20:46:47 +0100 Subject: [PATCH] Move update handler into Client --- pyrogram/client/client.py | 36 ++++++++++++++++++++++++++++++++---- pyrogram/session/session.py | 15 +++++---------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/pyrogram/client/client.py b/pyrogram/client/client.py index 7c17701f..863e83bb 100644 --- a/pyrogram/client/client.py +++ b/pyrogram/client/client.py @@ -23,12 +23,14 @@ import math import mimetypes import os import re +import threading import time from collections import namedtuple from configparser import ConfigParser from hashlib import sha256, md5 +from queue import Queue from signal import signal, SIGINT, SIGTERM, SIGABRT -from threading import Event +from threading import Event, Thread from pyrogram.api import functions, types from pyrogram.api.core import Object @@ -141,6 +143,8 @@ class Client: self.update_handler = None self.is_idle = Event() + self.update_queue = Queue() + def start(self): """Use this method to start the Client after creating it. Requires no parameters. @@ -156,7 +160,8 @@ class Client: self.test_mode, self.proxy, self.auth_key, - self.config.api_id + self.config.api_id, + client=self ) terms = self.session.start() @@ -170,7 +175,9 @@ class Client: self.rnd_id = self.session.msg_id self.get_dialogs() - self.session.set_update_handler(self, self.update_handler) + + for i in range(self.workers): + Thread(target=self.update_worker, name="UpdateWorker#{}".format(i + 1)).start() mimetypes.init() @@ -180,6 +187,26 @@ class Client: """ self.session.stop() + for i in range(self.workers): + self.update_queue.put(None) + + def update_worker(self): + name = threading.current_thread().name + log.debug("{} started".format(name)) + + while True: + update = self.update_queue.get() + + if update is None: + break + + try: + self.update_handler(self, update) + except Exception as e: + log.error(e, exc_info=True) + + log.debug("{} stopped".format(name)) + def signal_handler(self, *args): self.stop() self.is_idle.set() @@ -261,7 +288,8 @@ class Client: self.test_mode, self.proxy, self.auth_key, - self.config.api_id + self.config.api_id, + client=self ) self.session.start() diff --git a/pyrogram/session/session.py b/pyrogram/session/session.py index db03f740..d438de85 100644 --- a/pyrogram/session/session.py +++ b/pyrogram/session/session.py @@ -74,7 +74,8 @@ class Session: proxy: type, auth_key: bytes, api_id: str, - is_cdn: bool = False): + is_cdn: bool = False, + client: pyrogram = None): if not Session.notice_displayed: print("Pyrogram v{}, {}".format(__version__, __copyright__)) print("Licensed under the terms of the " + __license__, end="\n\n") @@ -83,6 +84,7 @@ class Session: self.connection = Connection(DataCenter(dc_id, test_mode), proxy) self.api_id = api_id self.is_cdn = is_cdn + self.client = client self.auth_key = auth_key self.auth_key_id = sha1(auth_key).digest()[-8:] @@ -106,9 +108,6 @@ class Session: self.is_connected = Event() - self.client = None - self.update_handler = None - def start(self): terms = None @@ -236,10 +235,6 @@ class Session: log.debug("{} stopped".format(name)) - def set_update_handler(self, client: pyrogram, update_handler: callable): - self.client = client - self.update_handler = update_handler - def unpack_dispatch_and_ack(self, packet: bytes): data = self.unpack(BytesIO(packet)) @@ -274,8 +269,8 @@ class Session: elif isinstance(msg.body, types.Pong): msg_id = msg.body.msg_id else: - if self.update_handler: - self.update_handler(self.client, msg.body) + if self.client is not None: + self.client.update_queue.put(msg.body) if msg_id in self.results: self.results[msg_id].value = getattr(msg.body, "result", msg.body)