diff --git a/pyrogram/session/session.py b/pyrogram/session/session.py index 37a42ec6..f8cdcc19 100644 --- a/pyrogram/session/session.py +++ b/pyrogram/session/session.py @@ -259,63 +259,60 @@ class Session: break try: - self.unpack_dispatch_and_ack(packet) + data = self.unpack(BytesIO(packet)) + + messages = ( + data.body.messages + if isinstance(data.body, MsgContainer) + else [data] + ) + + log.debug(data) + + for msg in messages: + if msg.seq_no % 2 != 0: + if msg.msg_id in self.pending_acks: + continue + else: + self.pending_acks.add(msg.msg_id) + + if isinstance(msg.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)): + self.pending_acks.add(msg.body.answer_msg_id) + continue + + if isinstance(msg.body, types.NewSessionCreated): + continue + + msg_id = None + + if isinstance(msg.body, (types.BadMsgNotification, types.BadServerSalt)): + msg_id = msg.body.bad_msg_id + elif isinstance(msg.body, (core.FutureSalts, types.RpcResult)): + msg_id = msg.body.req_msg_id + elif isinstance(msg.body, types.Pong): + msg_id = msg.body.msg_id + else: + if self.client is not None: + self.client.updates_queue.put(msg.body) + + if msg_id in self.results: + self.results[msg_id].value = getattr(msg.body, "result", msg.body) + self.results[msg_id].event.set() + + if len(self.pending_acks) >= self.ACKS_THRESHOLD: + log.info("Send {} acks".format(len(self.pending_acks))) + + try: + self._send(types.MsgsAck(list(self.pending_acks)), False) + except (OSError, TimeoutError): + pass + else: + self.pending_acks.clear() except Exception as e: log.error(e, exc_info=True) log.debug("{} stopped".format(name)) - def unpack_dispatch_and_ack(self, packet: bytes): - data = self.unpack(BytesIO(packet)) - - messages = ( - data.body.messages - if isinstance(data.body, MsgContainer) - else [data] - ) - - log.debug(data) - - for msg in messages: - if msg.seq_no % 2 != 0: - if msg.msg_id in self.pending_acks: - continue - else: - self.pending_acks.add(msg.msg_id) - - if isinstance(msg.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)): - self.pending_acks.add(msg.body.answer_msg_id) - continue - - if isinstance(msg.body, types.NewSessionCreated): - continue - - msg_id = None - - if isinstance(msg.body, (types.BadMsgNotification, types.BadServerSalt)): - msg_id = msg.body.bad_msg_id - elif isinstance(msg.body, (core.FutureSalts, types.RpcResult)): - msg_id = msg.body.req_msg_id - elif isinstance(msg.body, types.Pong): - msg_id = msg.body.msg_id - else: - if self.client is not None: - self.client.updates_queue.put(msg.body) - - if msg_id in self.results: - self.results[msg_id].value = getattr(msg.body, "result", msg.body) - self.results[msg_id].event.set() - - if len(self.pending_acks) >= self.ACKS_THRESHOLD: - log.info("Send {} acks".format(len(self.pending_acks))) - - try: - self._send(types.MsgsAck(list(self.pending_acks)), False) - except (OSError, TimeoutError): - pass - else: - self.pending_acks.clear() - def ping(self): log.debug("PingThread started")