2
0
mirror of https://github.com/pyrogram/pyrogram synced 2025-08-29 13:27:47 +00:00

Merge branch 'develop' into asyncio

# Conflicts:
#	pyrogram/client/ext/dispatcher.py
This commit is contained in:
Dan 2019-06-22 00:49:13 +02:00
commit 84278f9cee
2 changed files with 22 additions and 8 deletions

View File

@ -59,11 +59,11 @@ class Dispatcher:
self.workers = workers self.workers = workers
self.update_worker_tasks = [] self.update_worker_tasks = []
self.locks_list = []
self.updates_queue = asyncio.Queue() self.updates_queue = asyncio.Queue()
self.groups = OrderedDict() self.groups = OrderedDict()
self.lock = asyncio.Lock()
async def message_parser(update, users, chats): async def message_parser(update, users, chats):
return await pyrogram.Message._parse(self.client, update.message, users, chats), MessageHandler return await pyrogram.Message._parse(self.client, update.message, users, chats), MessageHandler
@ -95,8 +95,10 @@ class Dispatcher:
async def start(self): async def start(self):
for i in range(self.workers): for i in range(self.workers):
self.locks_list.append(asyncio.Lock())
self.update_worker_tasks.append( self.update_worker_tasks.append(
asyncio.ensure_future(self.update_worker()) asyncio.ensure_future(self.update_worker(self.locks_list[-1]))
) )
log.info("Started {} UpdateWorkerTasks".format(self.workers)) log.info("Started {} UpdateWorkerTasks".format(self.workers))
@ -115,26 +117,38 @@ class Dispatcher:
def add_handler(self, handler, group: int): def add_handler(self, handler, group: int):
async def fn(): async def fn():
async with self.lock: for lock in self.locks_list:
await lock.acquire()
try:
if group not in self.groups: if group not in self.groups:
self.groups[group] = [] self.groups[group] = []
self.groups = OrderedDict(sorted(self.groups.items())) self.groups = OrderedDict(sorted(self.groups.items()))
self.groups[group].append(handler) self.groups[group].append(handler)
finally:
for lock in self.locks_list:
lock.release()
asyncio.get_event_loop().run_until_complete(fn()) asyncio.get_event_loop().run_until_complete(fn())
def remove_handler(self, handler, group: int): def remove_handler(self, handler, group: int):
async def fn(): async def fn():
async with self.lock: for lock in self.locks_list:
await lock.acquire()
try:
if group not in self.groups: if group not in self.groups:
raise ValueError("Group {} does not exist. Handler was not removed.".format(group)) raise ValueError("Group {} does not exist. Handler was not removed.".format(group))
self.groups[group].remove(handler) self.groups[group].remove(handler)
finally:
for lock in self.locks_list:
lock.release()
asyncio.get_event_loop().run_until_complete(fn()) asyncio.get_event_loop().run_until_complete(fn())
async def update_worker(self): async def update_worker(self, lock):
while True: while True:
packet = await self.updates_queue.get() packet = await self.updates_queue.get()
@ -151,7 +165,7 @@ class Dispatcher:
else (None, type(None)) else (None, type(None))
) )
async with self.lock: async with lock:
for group in self.groups.values(): for group in self.groups.values():
for handler in group: for handler in group:
args = None args = None

View File

@ -106,7 +106,7 @@ class SendVideoNote(BaseClient):
Returns: Returns:
:obj:`Message` | ``None``: On success, the sent video note message is returned, otherwise, in case the :obj:`Message` | ``None``: On success, the sent video note message is returned, otherwise, in case the
pload is deliberately stopped with :meth:`~Client.stop_transmission`, None is returned. upload is deliberately stopped with :meth:`~Client.stop_transmission`, None is returned.
Raises: Raises:
RPCError: In case of a Telegram RPC error. RPCError: In case of a Telegram RPC error.