From 1587fadf36cd8fe1a46d6464cebc4dbfb898fc1c Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Sat, 18 Nov 2023 18:43:36 +0200 Subject: [PATCH] [#3142] fixed deadlock --- src/lib/util/tests/thread_pool_unittest.cc | 6 ++--- src/lib/util/thread_pool.h | 29 ++++++++++++++++++---- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index 9c367c914c..4fa88d9265 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -526,7 +526,7 @@ TEST_F(ThreadPoolTest, wait) { checkState(thread_pool, 0, 0); items_count = 16; - thread_count = 16; + thread_count = 256; // prepare setup reset(thread_count); @@ -546,8 +546,8 @@ TEST_F(ThreadPoolTest, wait) { // calling stop should clear all threads and should keep queued items EXPECT_NO_THROW(thread_pool.stop()); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); + checkState(thread_pool, 0, 0); + // wait for all items to be processed ASSERT_TRUE(thread_pool.wait(1)); checkState(thread_pool, 0, 0); diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 7a090a60a8..6807f530ab 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -278,7 +278,7 @@ private: /// Creates the thread pool queue in 'disabled' state ThreadPoolQueue() : enabled_(false), paused_(false), max_queue_size_(0), working_(0), - stat10(0.), stat100(0.), stat1000(0.) { + unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) { } /// @brief Destructor @@ -289,6 +289,20 @@ private: clear(); } + /// @brief register thread so that it can be taken into account + void registerThread() { + std::lock_guard lock(mutex_); + ++working_; + --unavailable_; + } + + /// @brief unregister thread so that it can be ignored + void unregisterThread() { + std::lock_guard lock(mutex_); + --working_; + ++unavailable_; + } + /// @brief set maximum number of work items in the queue /// /// @return the maximum size (0 means unlimited) @@ -377,7 +391,7 @@ private: std::unique_lock lock(mutex_); --working_; // Signal thread waiting for threads to pause. - if (working_ == 0 && paused_) { + if (paused_ && working_ == 0 && unavailable_ == 0) { wait_threads_cv_.notify_all(); } // Signal thread waiting for tasks to finish. @@ -386,10 +400,10 @@ private: } // Wait for push or disable functions. cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));}); + ++working_; if (!enabled_) { return (Item()); } - ++working_; size_t length = queue_.size(); stat10 = stat10 * CEXP10 + (1 - CEXP10) * length; stat100 = stat100 * CEXP100 + (1 - CEXP100) * length; @@ -444,7 +458,7 @@ private: paused_ = true; if (wait) { // Wait for working threads to finish. - wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);}); + wait_threads_cv_.wait(lock, [&]() {return (working_ == 0 && unavailable_ == 0);}); } } @@ -493,7 +507,7 @@ private: void enable(uint32_t thread_count) { std::lock_guard lock(mutex_); enabled_ = true; - working_ = thread_count; + unavailable_ = thread_count; } /// @brief disable the queue @@ -562,6 +576,9 @@ private: /// @brief number of threads currently doing work uint32_t working_; + /// @brief number of threads not running + uint32_t unavailable_; + /// @brief queue length statistic for 10 packets double stat10; @@ -574,6 +591,7 @@ private: /// @brief run function of each thread void run() { + queue_.registerThread(); for (bool work = true; work; work = queue_.enabled()) { WorkItemPtr item = queue_.pop(); if (item) { @@ -584,6 +602,7 @@ private: } } } + queue_.unregisterThread(); } /// @brief list of worker threads