mirror of
https://github.com/pyrogram/pyrogram
synced 2025-08-28 21:07:59 +00:00
Fix workers not running concurrently anymore after using a shared Lock
This commit is contained in:
parent
b439e44015
commit
e7fffd2f76
@ -61,11 +61,11 @@ class Dispatcher:
|
|||||||
self.workers = workers
|
self.workers = workers
|
||||||
|
|
||||||
self.workers_list = []
|
self.workers_list = []
|
||||||
|
self.locks_list = []
|
||||||
|
|
||||||
self.updates_queue = Queue()
|
self.updates_queue = Queue()
|
||||||
self.groups = OrderedDict()
|
self.groups = OrderedDict()
|
||||||
|
|
||||||
self.lock = Lock()
|
|
||||||
|
|
||||||
self.update_parsers = {
|
self.update_parsers = {
|
||||||
Dispatcher.MESSAGE_UPDATES:
|
Dispatcher.MESSAGE_UPDATES:
|
||||||
lambda upd, usr, cht: (pyrogram.Message._parse(self.client, upd.message, usr, cht), MessageHandler),
|
lambda upd, usr, cht: (pyrogram.Message._parse(self.client, upd.message, usr, cht), MessageHandler),
|
||||||
@ -92,10 +92,13 @@ class Dispatcher:
|
|||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
for i in range(self.workers):
|
for i in range(self.workers):
|
||||||
|
self.locks_list.append(Lock())
|
||||||
|
|
||||||
self.workers_list.append(
|
self.workers_list.append(
|
||||||
Thread(
|
Thread(
|
||||||
target=self.update_worker,
|
target=self.update_worker,
|
||||||
name="UpdateWorker#{}".format(i + 1)
|
name="UpdateWorker#{}".format(i + 1),
|
||||||
|
args=(self.locks_list[-1],)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -109,24 +112,37 @@ class Dispatcher:
|
|||||||
worker.join()
|
worker.join()
|
||||||
|
|
||||||
self.workers_list.clear()
|
self.workers_list.clear()
|
||||||
|
self.locks_list.clear()
|
||||||
self.groups.clear()
|
self.groups.clear()
|
||||||
|
|
||||||
def add_handler(self, handler, group: int):
|
def add_handler(self, handler, group: int):
|
||||||
with self.lock:
|
for lock in self.locks_list:
|
||||||
|
lock.acquire()
|
||||||
|
|
||||||
|
try:
|
||||||
if group not in self.groups:
|
if group not in self.groups:
|
||||||
self.groups[group] = []
|
self.groups[group] = []
|
||||||
self.groups = OrderedDict(sorted(self.groups.items()))
|
self.groups = OrderedDict(sorted(self.groups.items()))
|
||||||
|
|
||||||
self.groups[group].append(handler)
|
self.groups[group].append(handler)
|
||||||
|
finally:
|
||||||
|
for lock in self.locks_list:
|
||||||
|
lock.release()
|
||||||
|
|
||||||
def remove_handler(self, handler, group: int):
|
def remove_handler(self, handler, group: int):
|
||||||
with self.lock:
|
for lock in self.locks_list:
|
||||||
|
lock.acquire()
|
||||||
|
|
||||||
|
try:
|
||||||
if group not in self.groups:
|
if group not in self.groups:
|
||||||
raise ValueError("Group {} does not exist. Handler was not removed.".format(group))
|
raise ValueError("Group {} does not exist. Handler was not removed.".format(group))
|
||||||
|
|
||||||
self.groups[group].remove(handler)
|
self.groups[group].remove(handler)
|
||||||
|
finally:
|
||||||
|
for lock in self.locks_list:
|
||||||
|
lock.release()
|
||||||
|
|
||||||
def update_worker(self):
|
def update_worker(self, lock):
|
||||||
name = threading.current_thread().name
|
name = threading.current_thread().name
|
||||||
log.debug("{} started".format(name))
|
log.debug("{} started".format(name))
|
||||||
|
|
||||||
@ -146,7 +162,7 @@ class Dispatcher:
|
|||||||
else (None, type(None))
|
else (None, type(None))
|
||||||
)
|
)
|
||||||
|
|
||||||
with self.lock:
|
with lock:
|
||||||
for group in self.groups.values():
|
for group in self.groups.values():
|
||||||
for handler in group:
|
for handler in group:
|
||||||
args = None
|
args = None
|
||||||
|
Loading…
x
Reference in New Issue
Block a user