2
0
mirror of https://github.com/pyrogram/pyrogram synced 2025-08-28 12:57:52 +00:00

Lock dispatcher groups. Fixes #255

This commit is contained in:
Dan 2019-06-21 01:53:17 +02:00
parent 0699bd31e5
commit 8d0e161b56

View File

@ -20,7 +20,7 @@ import logging
import threading
from collections import OrderedDict
from queue import Queue
from threading import Thread
from threading import Thread, Lock
import pyrogram
from pyrogram.api import types
@ -64,6 +64,8 @@ class Dispatcher:
self.updates_queue = Queue()
self.groups = OrderedDict()
self.lock = Lock()
self.update_parsers = {
Dispatcher.MESSAGE_UPDATES:
lambda upd, usr, cht: (pyrogram.Message._parse(self.client, upd.message, usr, cht), MessageHandler),
@ -110,17 +112,19 @@ class Dispatcher:
self.groups.clear()
def add_handler(self, handler, group: int):
if group not in self.groups:
self.groups[group] = []
self.groups = OrderedDict(sorted(self.groups.items()))
with self.lock:
if group not in self.groups:
self.groups[group] = []
self.groups = OrderedDict(sorted(self.groups.items()))
self.groups[group].append(handler)
self.groups[group].append(handler)
def remove_handler(self, handler, group: int):
if group not in self.groups:
raise ValueError("Group {} does not exist. Handler was not removed.".format(group))
with self.lock:
if group not in self.groups:
raise ValueError("Group {} does not exist. Handler was not removed.".format(group))
self.groups[group].remove(handler)
self.groups[group].remove(handler)
def update_worker(self):
name = threading.current_thread().name
@ -142,29 +146,30 @@ class Dispatcher:
else (None, type(None))
)
for group in self.groups.values():
for handler in group:
args = None
with self.lock:
for group in self.groups.values():
for handler in group:
args = None
if isinstance(handler, handler_type):
if handler.check(parsed_update):
args = (parsed_update,)
elif isinstance(handler, RawUpdateHandler):
args = (update, users, chats)
if isinstance(handler, handler_type):
if handler.check(parsed_update):
args = (parsed_update,)
elif isinstance(handler, RawUpdateHandler):
args = (update, users, chats)
if args is None:
continue
if args is None:
continue
try:
handler.callback(self.client, *args)
except pyrogram.StopPropagation:
raise
except pyrogram.ContinuePropagation:
continue
except Exception as e:
log.error(e, exc_info=True)
try:
handler.callback(self.client, *args)
except pyrogram.StopPropagation:
raise
except pyrogram.ContinuePropagation:
continue
except Exception as e:
log.error(e, exc_info=True)
break
break
except pyrogram.StopPropagation:
pass
except Exception as e: