diff --git a/pyrogram/client/client.py b/pyrogram/client/client.py index 099a0cb6..b3cc7630 100644 --- a/pyrogram/client/client.py +++ b/pyrogram/client/client.py @@ -97,12 +97,12 @@ class Client: be an empty string: "" workers (:obj:`int`, optional): - Thread pool size for handling incoming events (updates). Defaults to 4. + Thread pool size for handling incoming updates. Defaults to 4. """ INVITE_LINK_RE = re.compile(r"^(?:https?://)?t\.me/joinchat/(.+)$") DIALOGS_AT_ONCE = 100 - UPDATE_WORKERS = 2 + UPDATES_WORKERS = 2 def __init__(self, session_name: str, @@ -144,9 +144,9 @@ class Client: self.is_idle = Event() - self.event_handler = None + self.updates_queue = Queue() self.update_queue = Queue() - self.event_queue = Queue() + self.update_handler = None def start(self): """Use this method to start the Client after creating it. @@ -179,11 +179,11 @@ class Client: self.rnd_id = self.session.msg_id self.get_dialogs() - for i in range(self.UPDATE_WORKERS): - Thread(target=self.update_worker, name="UpdateWorker#{}".format(i + 1)).start() + for i in range(self.UPDATES_WORKERS): + Thread(target=self.updates_worker, name="UpdatesWorker#{}".format(i + 1)).start() for i in range(self.workers): - Thread(target=self.event_worker, name="EventWorker#{}".format(i + 1)).start() + Thread(target=self.update_worker, name="UpdateWorker#{}".format(i + 1)).start() mimetypes.init() @@ -193,11 +193,11 @@ class Client: """ self.session.stop() - for i in range(self.UPDATE_WORKERS): - self.update_queue.put(None) + for _ in range(self.UPDATES_WORKERS): + self.updates_queue.put(None) - for i in range(self.workers): - self.event_queue.put(None) + for _ in range(self.workers): + self.update_queue.put(None) def fetch_peers(self, entities: list): for entity in entities: @@ -260,31 +260,31 @@ class Client: if username is not None: self.peers_by_username[username] = input_peer - def update_worker(self): + def updates_worker(self): name = threading.current_thread().name log.debug("{} started".format(name)) while True: - update = self.update_queue.get() + updates = self.updates_queue.get() - if update is None: + if updates is None: break try: - if isinstance(update, (types.Update, types.UpdatesCombined)): - self.fetch_peers(update.users) - self.fetch_peers(update.chats) + if isinstance(updates, (types.Update, types.UpdatesCombined)): + self.fetch_peers(updates.users) + self.fetch_peers(updates.chats) - for i in update.updates: + for update in updates.updates: channel_id = getattr( getattr( getattr( - i, "message", None + update, "message", None ), "to_id", None ), "channel_id", None - ) or getattr(i, "channel_id", None) + ) or getattr(update, "channel_id", None) - pts = getattr(i, "pts", None) + pts = getattr(update, "pts", None) if channel_id and pts: if channel_id not in self.channels_pts: @@ -298,12 +298,12 @@ class Client: if len(self.channels_pts[channel_id]) > 50: self.channels_pts[channel_id] = self.channels_pts[channel_id][25:] - self.event_queue.put((i, update.users, update.chats)) - elif isinstance(update, (types.UpdateShortMessage, types.UpdateShortChatMessage)): + self.update_queue.put((update, updates.users, updates.chats)) + elif isinstance(updates, (types.UpdateShortMessage, types.UpdateShortChatMessage)): diff = self.send( functions.updates.GetDifference( - pts=update.pts - update.pts_count, - date=update.date, + pts=updates.pts - updates.pts_count, + date=updates.date, qts=-1 ) ) @@ -311,39 +311,39 @@ class Client: self.fetch_peers(diff.users) self.fetch_peers(diff.chats) - self.event_queue.put(( + self.update_queue.put(( types.UpdateNewMessage( message=diff.new_messages[0], - pts=update.pts, - pts_count=update.pts_count + pts=updates.pts, + pts_count=updates.pts_count ), diff.users, diff.chats )) - elif isinstance(update, types.UpdateShort): - self.event_queue.put((update.update, [], [])) + elif isinstance(updates, types.UpdateShort): + self.update_queue.put((updates.update, [], [])) except Exception as e: log.error(e, exc_info=True) log.debug("{} stopped".format(name)) - def event_worker(self): + def update_worker(self): name = threading.current_thread().name log.debug("{} started".format(name)) while True: - event = self.event_queue.get() + update = self.update_queue.get() - if event is None: + if update is None: break try: - if self.event_handler: - self.event_handler( + if self.update_handler: + self.update_handler( self, - event[0], - {i.id: i for i in event[1]}, - {i.id: i for i in event[2]} + update[0], + {i.id: i for i in update[1]}, + {i.id: i for i in update[2]} ) except Exception as e: log.error(e, exc_info=True) @@ -368,15 +368,15 @@ class Client: self.is_idle.wait() - def set_event_handler(self, callback: callable): - """Use this method to set the event handler. + def set_update_handler(self, callback: callable): + """Use this method to set the update handler. Args: callback (:obj:`callable`): - A function that takes ``client, event`` as positional arguments. - It will be called when a new event is generated on your account. + A function that takes "client, update, users, chats" as positional arguments. + It will be called when a new update is received from the server. """ - self.event_handler = callback + self.update_handler = callback def send(self, data: Object): """Use this method to send Raw Function queries. diff --git a/pyrogram/session/session.py b/pyrogram/session/session.py index f47cbc24..8e56911f 100644 --- a/pyrogram/session/session.py +++ b/pyrogram/session/session.py @@ -270,7 +270,7 @@ class Session: msg_id = msg.body.msg_id else: if self.client is not None: - self.client.update_queue.put(msg.body) + self.client.updates_queue.put(msg.body) if msg_id in self.results: self.results[msg_id].value = getattr(msg.body, "result", msg.body)