From 07c3d96d821f2c29c9b4a7e75634ada79deaa7bb Mon Sep 17 00:00:00 2001 From: Dan <14043624+delivrance@users.noreply.github.com> Date: Thu, 8 Feb 2018 21:59:08 +0100 Subject: [PATCH] Add Event Handler (for single updates) --- pyrogram/client/client.py | 40 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/pyrogram/client/client.py b/pyrogram/client/client.py index 863e83bb..fed9607b 100644 --- a/pyrogram/client/client.py +++ b/pyrogram/client/client.py @@ -103,6 +103,7 @@ class Client: INVITE_LINK_RE = re.compile(r"^(?:https?://)?t\.me/joinchat/(.+)$") DIALOGS_AT_ONCE = 100 + UPDATE_WORKERS = 2 def __init__(self, session_name: str, @@ -144,6 +145,7 @@ class Client: self.is_idle = Event() self.update_queue = Queue() + self.event_queue = Queue() def start(self): """Use this method to start the Client after creating it. @@ -176,9 +178,12 @@ class Client: self.rnd_id = self.session.msg_id self.get_dialogs() - for i in range(self.workers): + for i in range(self.UPDATE_WORKERS): Thread(target=self.update_worker, name="UpdateWorker#{}".format(i + 1)).start() + for i in range(self.workers): + Thread(target=self.event_worker, name="EventWorker#{}".format(i + 1)).start() + mimetypes.init() def stop(self): @@ -187,9 +192,12 @@ class Client: """ self.session.stop() - for i in range(self.workers): + for i in range(self.UPDATE_WORKERS): self.update_queue.put(None) + for i in range(self.workers): + self.event_queue.put(None) + def update_worker(self): name = threading.current_thread().name log.debug("{} started".format(name)) @@ -201,7 +209,33 @@ class Client: break try: - self.update_handler(self, update) + # TODO: Fetch users and chats + if isinstance(update, (types.Update, types.UpdatesCombined)): + for i in update.updates: + self.event_queue.put(i) + elif isinstance(update, (types.UpdateShortMessage, types.UpdateShortChatMessage)): + self.event_queue.put(update) + elif isinstance(update, types.UpdateShort): + self.event_queue.put(update.update) + else: + print(">>>>>", type(update)) + except Exception as e: + log.error(e, exc_info=True) + + log.debug("{} stopped".format(name)) + + def event_worker(self): + name = threading.current_thread().name + log.debug("{} started".format(name)) + + while True: + event = self.event_queue.get() + + if event is None: + break + + try: + self.update_handler(self, event) except Exception as e: log.error(e, exc_info=True)