diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index 43becc3955..439f7a3ed3 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -553,6 +553,232 @@ TEST_F(ThreadPoolTest, wait) { checkState(thread_pool, 0, 0); } +/// @brief test ThreadPool pause and resume +TEST_F(ThreadPoolTest, pauseAndResume) { + uint32_t items_count; + uint32_t thread_count; + CallBack call_back; + ThreadPool thread_pool; + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 4; + thread_count = 4; + // prepare setup + reset(thread_count); + + // create tasks which block thread pool threads until signaled by main + // thread to force all threads of the thread pool to run exactly one task + call_back = std::bind(&ThreadPoolTest::runAndWait, this); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling resume should resume threads + EXPECT_NO_THROW(thread_pool.resume()); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // as each thread pool thread is still waiting on main to unblock, each + // thread should have been registered in ids list + checkIds(items_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // signal thread pool tasks to continue + signalThreads(); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 64; + thread_count = 16; + // prepare setup + reset(thread_count); + + // create tasks which do not block the thread pool threads so that several + // tasks can be run on the same thread and some of the threads never even + // having a chance to run + call_back = std::bind(&ThreadPoolTest::run, this); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling pause should pause threads + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // prepare setup + reset(thread_count); + + // add items to paused thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling resume should resume threads + EXPECT_NO_THROW(thread_pool.resume()); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); +>>>>>>> 0d7199fdd8 ([#1599] implemented pause and resume) +} + /// @brief test ThreadPool max queue size TEST_F(ThreadPoolTest, maxQueueSize) { uint32_t items_count; diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 726c1c61bf..207c265d57 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -140,6 +140,20 @@ struct ThreadPool { return (queue_.wait(seconds)); } + /// @brief pause threads + /// + /// Used to pause threads so that they stop processing tasks + void pause() { + queue_.pause(); + } + + /// @brief resume threads + /// + /// Used to resume threads so that they start processing tasks + void resume() { + queue_.resume(); + } + /// @brief set maximum number of work items in the queue /// /// @param max_queue_size the maximum size (0 means unlimited) @@ -241,7 +255,7 @@ private: /// /// Creates the thread pool queue in 'disabled' state ThreadPoolQueue() - : enabled_(false), max_queue_size_(0), working_(0), + : enabled_(false), paused_(false), max_queue_size_(0), working_(0), stat10(0.), stat100(0.), stat1000(0.) { } @@ -355,6 +369,7 @@ private: wait_cv_.notify_all(); } cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); + pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);}); ++working_; if (!enabled_) { return (Item()); @@ -403,6 +418,23 @@ private: return (ret); } + /// @brief pause threads + /// + /// Used to pause threads so that they stop processing tasks + void pause() { + std::unique_lock lock(mutex_); + paused_ = true; + } + + /// @brief resume threads + /// + /// Used to resume threads so that they start processing tasks + void resume() { + std::unique_lock lock(mutex_); + paused_ = false; + pause_cv_.notify_all(); + } + /// @brief get queue length statistic /// /// @param which select the statistic (10, 100 or 1000) @@ -473,11 +505,19 @@ private: /// @brief condition variable used to wait for all items to be processed std::condition_variable wait_cv_; + /// @brief condition variable used to pause threads + std::condition_variable pause_cv_; + /// @brief the state of the queue /// The 'enabled' state corresponds to true value /// The 'disabled' state corresponds to false value std::atomic enabled_; + /// @brief the pause state of the queue + /// The 'paused' state corresponds to true value + /// The 'resumed' state corresponds to false value + std::atomic paused_; + /// @brief maximum number of work items in the queue /// (0 means unlimited) size_t max_queue_size_;