2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-31 22:15:23 +00:00

[#1599] implemented pause and resume

This commit is contained in:
Razvan Becheriu
2020-12-11 12:52:03 +02:00
parent 9437ae9b0a
commit ae44dc38df
2 changed files with 267 additions and 1 deletions

View File

@@ -553,6 +553,232 @@ TEST_F(ThreadPoolTest, wait) {
checkState(thread_pool, 0, 0);
}
/// @brief test ThreadPool pause and resume
TEST_F(ThreadPoolTest, pauseAndResume) {
uint32_t items_count;
uint32_t thread_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
items_count = 4;
thread_count = 4;
// prepare setup
reset(thread_count);
// create tasks which block thread pool threads until signaled by main
// thread to force all threads of the thread pool to run exactly one task
call_back = std::bind(&ThreadPoolTest::runAndWait, this);
// calling resume should do nothing if not started
EXPECT_NO_THROW(thread_pool.resume());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// do it once again to check if it works
EXPECT_NO_THROW(thread_pool.resume());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling pause should do nothing if not started
EXPECT_NO_THROW(thread_pool.pause());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// do it once again to check if it works
EXPECT_NO_THROW(thread_pool.pause());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling resume should do nothing if not started
EXPECT_NO_THROW(thread_pool.resume());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling pause should do nothing if not started
EXPECT_NO_THROW(thread_pool.pause());
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling resume should do nothing if not started
EXPECT_NO_THROW(thread_pool.resume());
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling pause should do nothing if not started
EXPECT_NO_THROW(thread_pool.pause());
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling start should create the threads and should keep the queued items
EXPECT_NO_THROW(thread_pool.start(thread_count));
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// calling resume should resume threads
EXPECT_NO_THROW(thread_pool.resume());
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// wait for all items to be processed
waitTasks(thread_count, items_count);
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// as each thread pool thread is still waiting on main to unblock, each
// thread should have been registered in ids list
checkIds(items_count);
// all items should have been processed
ASSERT_EQ(count(), items_count);
// check that the number of processed tasks matches the number of items
checkRunHistory(items_count);
// signal thread pool tasks to continue
signalThreads();
// do it once again to check if it works
EXPECT_NO_THROW(thread_pool.resume());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// calling stop should clear all threads and should keep queued items
EXPECT_NO_THROW(thread_pool.stop());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
items_count = 64;
thread_count = 16;
// prepare setup
reset(thread_count);
// create tasks which do not block the thread pool threads so that several
// tasks can be run on the same thread and some of the threads never even
// having a chance to run
call_back = std::bind(&ThreadPoolTest::run, this);
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
// calling start should create the threads and should keep the queued items
EXPECT_NO_THROW(thread_pool.start(thread_count));
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// wait for all items to be processed
waitTasks(thread_count, items_count);
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// all items should have been processed
ASSERT_EQ(count(), items_count);
// check that the number of processed tasks matches the number of items
checkRunHistory(items_count);
// calling pause should pause threads
EXPECT_NO_THROW(thread_pool.pause());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// do it once again to check if it works
EXPECT_NO_THROW(thread_pool.pause());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// prepare setup
reset(thread_count);
// add items to paused thread pool
for (uint32_t i = 0; i < items_count; ++i) {
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// calling resume should resume threads
EXPECT_NO_THROW(thread_pool.resume());
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// wait for all items to be processed
waitTasks(thread_count, items_count);
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
// all items should have been processed
ASSERT_EQ(count(), items_count);
// check that the number of processed tasks matches the number of items
checkRunHistory(items_count);
// calling stop should clear all threads and should keep queued items
EXPECT_NO_THROW(thread_pool.stop());
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
>>>>>>> 0d7199fdd8 ([#1599] implemented pause and resume)
}
/// @brief test ThreadPool max queue size
TEST_F(ThreadPoolTest, maxQueueSize) {
uint32_t items_count;

View File

@@ -140,6 +140,20 @@ struct ThreadPool {
return (queue_.wait(seconds));
}
/// @brief pause threads
///
/// Used to pause threads so that they stop processing tasks
void pause() {
queue_.pause();
}
/// @brief resume threads
///
/// Used to resume threads so that they start processing tasks
void resume() {
queue_.resume();
}
/// @brief set maximum number of work items in the queue
///
/// @param max_queue_size the maximum size (0 means unlimited)
@@ -241,7 +255,7 @@ private:
///
/// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue()
: enabled_(false), max_queue_size_(0), working_(0),
: enabled_(false), paused_(false), max_queue_size_(0), working_(0),
stat10(0.), stat100(0.), stat1000(0.) {
}
@@ -355,6 +369,7 @@ private:
wait_cv_.notify_all();
}
cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);});
++working_;
if (!enabled_) {
return (Item());
@@ -403,6 +418,23 @@ private:
return (ret);
}
/// @brief pause threads
///
/// Used to pause threads so that they stop processing tasks
void pause() {
std::unique_lock<std::mutex> lock(mutex_);
paused_ = true;
}
/// @brief resume threads
///
/// Used to resume threads so that they start processing tasks
void resume() {
std::unique_lock<std::mutex> lock(mutex_);
paused_ = false;
pause_cv_.notify_all();
}
/// @brief get queue length statistic
///
/// @param which select the statistic (10, 100 or 1000)
@@ -473,11 +505,19 @@ private:
/// @brief condition variable used to wait for all items to be processed
std::condition_variable wait_cv_;
/// @brief condition variable used to pause threads
std::condition_variable pause_cv_;
/// @brief the state of the queue
/// The 'enabled' state corresponds to true value
/// The 'disabled' state corresponds to false value
std::atomic<bool> enabled_;
/// @brief the pause state of the queue
/// The 'paused' state corresponds to true value
/// The 'resumed' state corresponds to false value
std::atomic<bool> paused_;
/// @brief maximum number of work items in the queue
/// (0 means unlimited)
size_t max_queue_size_;