2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-10-17 14:26:31 +00:00

[#1173] Addressed comments

This commit is contained in:
Francis Dupont
2020-05-06 21:40:08 +02:00
committed by Razvan Becheriu
parent 99ad85b77b
commit c8b21c81a6
14 changed files with 84 additions and 44 deletions

View File

@@ -1,4 +1,4 @@
// File created from ../../../src/bin/dhcp4/dhcp4_messages.mes on Thu Apr 30 2020 14:12
// File created from ../../../src/bin/dhcp4/dhcp4_messages.mes on Wed May 06 2020 20:50
#include <cstddef>
#include <log/message_types.h>
@@ -111,6 +111,7 @@ extern const isc::log::MessageID DHCP4_PACKET_PACK = "DHCP4_PACKET_PACK";
extern const isc::log::MessageID DHCP4_PACKET_PACK_FAIL = "DHCP4_PACKET_PACK_FAIL";
extern const isc::log::MessageID DHCP4_PACKET_PROCESS_EXCEPTION = "DHCP4_PACKET_PROCESS_EXCEPTION";
extern const isc::log::MessageID DHCP4_PACKET_PROCESS_STD_EXCEPTION = "DHCP4_PACKET_PROCESS_STD_EXCEPTION";
extern const isc::log::MessageID DHCP4_PACKET_QUEUE_FULL = "DHCP4_PACKET_QUEUE_FULL";
extern const isc::log::MessageID DHCP4_PACKET_RECEIVED = "DHCP4_PACKET_RECEIVED";
extern const isc::log::MessageID DHCP4_PACKET_SEND = "DHCP4_PACKET_SEND";
extern const isc::log::MessageID DHCP4_PACKET_SEND_FAIL = "DHCP4_PACKET_SEND_FAIL";
@@ -256,6 +257,7 @@ const char* values[] = {
"DHCP4_PACKET_PACK_FAIL", "%1: preparing on-wire-format of the packet to be sent failed %2",
"DHCP4_PACKET_PROCESS_EXCEPTION", "exception occurred during packet processing",
"DHCP4_PACKET_PROCESS_STD_EXCEPTION", "exception occurred during packet processing: %1",
"DHCP4_PACKET_QUEUE_FULL", "multi-threading packet queue is full",
"DHCP4_PACKET_RECEIVED", "%1: %2 (type %3) received from %4 to %5 on interface %6",
"DHCP4_PACKET_SEND", "%1: trying to send packet %2 (type %3) from %4:%5 to %6:%7 on interface %8",
"DHCP4_PACKET_SEND_FAIL", "%1: failed to send DHCPv4 packet: %2",

View File

@@ -1,4 +1,4 @@
// File created from ../../../src/bin/dhcp4/dhcp4_messages.mes on Thu Apr 30 2020 14:12
// File created from ../../../src/bin/dhcp4/dhcp4_messages.mes on Wed May 06 2020 20:50
#ifndef DHCP4_MESSAGES_H
#define DHCP4_MESSAGES_H
@@ -112,6 +112,7 @@ extern const isc::log::MessageID DHCP4_PACKET_PACK;
extern const isc::log::MessageID DHCP4_PACKET_PACK_FAIL;
extern const isc::log::MessageID DHCP4_PACKET_PROCESS_EXCEPTION;
extern const isc::log::MessageID DHCP4_PACKET_PROCESS_STD_EXCEPTION;
extern const isc::log::MessageID DHCP4_PACKET_QUEUE_FULL;
extern const isc::log::MessageID DHCP4_PACKET_RECEIVED;
extern const isc::log::MessageID DHCP4_PACKET_SEND;
extern const isc::log::MessageID DHCP4_PACKET_SEND_FAIL;

View File

@@ -615,6 +615,10 @@ during packet processing that was not caught by other, more specific
exception handlers. This packet will be dropped and the server will
continue operation.
% DHCP4_PACKET_QUEUE_FULL multi-threading packet queue is full
A debug message noting that the multi-threading packet queue is full so
the oldest packet of the queue was dropped to make room for the received one.
% DHCP4_PACKET_RECEIVED %1: %2 (type %3) received from %4 to %5 on interface %6
A debug message noting that the server has received the specified type of
packet on the specified interface. The first argument specifies the

View File

@@ -1020,7 +1020,9 @@ Dhcpv4Srv::run_one() {
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow,
this, query));
MultiThreadingMgr::instance().getThreadPool().add(call_back);
if (!MultiThreadingMgr::instance().getThreadPool().add(call_back)) {
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_BASIC, DHCP4_PACKET_QUEUE_FULL);
}
} else {
processPacketAndSendResponse(query);
}

View File

@@ -1,4 +1,4 @@
// File created from ../../../src/bin/dhcp6/dhcp6_messages.mes on Thu Apr 16 2020 10:27
// File created from ../../../src/bin/dhcp6/dhcp6_messages.mes on Wed May 06 2020 20:50
#include <cstddef>
#include <log/message_types.h>
@@ -99,6 +99,7 @@ extern const isc::log::MessageID DHCP6_PACKET_OPTIONS_SKIPPED = "DHCP6_PACKET_OP
extern const isc::log::MessageID DHCP6_PACKET_PROCESS_EXCEPTION = "DHCP6_PACKET_PROCESS_EXCEPTION";
extern const isc::log::MessageID DHCP6_PACKET_PROCESS_FAIL = "DHCP6_PACKET_PROCESS_FAIL";
extern const isc::log::MessageID DHCP6_PACKET_PROCESS_STD_EXCEPTION = "DHCP6_PACKET_PROCESS_STD_EXCEPTION";
extern const isc::log::MessageID DHCP6_PACKET_QUEUE_FULL = "DHCP6_PACKET_QUEUE_FULL";
extern const isc::log::MessageID DHCP6_PACKET_RECEIVED = "DHCP6_PACKET_RECEIVED";
extern const isc::log::MessageID DHCP6_PACKET_RECEIVE_FAIL = "DHCP6_PACKET_RECEIVE_FAIL";
extern const isc::log::MessageID DHCP6_PACKET_SEND = "DHCP6_PACKET_SEND";
@@ -245,6 +246,7 @@ const char* values[] = {
"DHCP6_PACKET_PROCESS_EXCEPTION", "exception occurred during packet processing",
"DHCP6_PACKET_PROCESS_FAIL", "processing of %1 message received from %2 failed: %3",
"DHCP6_PACKET_PROCESS_STD_EXCEPTION", "exception occurred during packet processing: %1",
"DHCP6_PACKET_QUEUE_FULL", "multi-threading packet queue is full",
"DHCP6_PACKET_RECEIVED", "%1: %2 (type %3) received from %4 to %5 on interface %6",
"DHCP6_PACKET_RECEIVE_FAIL", "error on attempt to receive packet: %1",
"DHCP6_PACKET_SEND", "%1: trying to send packet %2 (type %3) from [%4]:%5 to [%6]:%7 on interface %8",

View File

@@ -1,4 +1,4 @@
// File created from ../../../src/bin/dhcp6/dhcp6_messages.mes on Thu Apr 16 2020 10:27
// File created from ../../../src/bin/dhcp6/dhcp6_messages.mes on Wed May 06 2020 20:50
#ifndef DHCP6_MESSAGES_H
#define DHCP6_MESSAGES_H
@@ -100,6 +100,7 @@ extern const isc::log::MessageID DHCP6_PACKET_OPTIONS_SKIPPED;
extern const isc::log::MessageID DHCP6_PACKET_PROCESS_EXCEPTION;
extern const isc::log::MessageID DHCP6_PACKET_PROCESS_FAIL;
extern const isc::log::MessageID DHCP6_PACKET_PROCESS_STD_EXCEPTION;
extern const isc::log::MessageID DHCP6_PACKET_QUEUE_FULL;
extern const isc::log::MessageID DHCP6_PACKET_RECEIVED;
extern const isc::log::MessageID DHCP6_PACKET_RECEIVE_FAIL;
extern const isc::log::MessageID DHCP6_PACKET_SEND;

View File

@@ -558,6 +558,10 @@ during packet processing that was not caught by other, more specific
exception handlers. This packet will be dropped and the server will
continue operation.
% DHCP6_PACKET_QUEUE_FULL multi-threading packet queue is full
A debug message noting that the multi-threading packet queue is full so
the oldest packet of the queue was dropped to make room for the received one.
% DHCP6_PACKET_RECEIVED %1: %2 (type %3) received from %4 to %5 on interface %6
A debug message noting that the server has received the specified type of
packet on the specified interface. The first argument specifies the

View File

@@ -601,7 +601,9 @@ void Dhcpv6Srv::run_one() {
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow,
this, query));
MultiThreadingMgr::instance().getThreadPool().add(call_back);
if (!MultiThreadingMgr::instance().getThreadPool().add(call_back)) {
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_PACKET_QUEUE_FULL);
}
} else {
processPacketAndSendResponse(query);
}

View File

@@ -89,7 +89,7 @@ TEST_F(CfgMultiThreadingTest, apply) {
EXPECT_FALSE(MultiThreadingMgr::instance().getMode());
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxCount(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxQueueSize(), 0);
std::string content_json =
"{"
" \"enable-multi-threading\": true,\n"
@@ -103,7 +103,7 @@ TEST_F(CfgMultiThreadingTest, apply) {
EXPECT_TRUE(MultiThreadingMgr::instance().getMode());
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 4);
EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 64);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxCount(), 64);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxQueueSize(), 64);
}
} // namespace

View File

@@ -203,7 +203,7 @@ TEST_F(DHCPQueueControlParserTest, multiThreading) {
MultiThreadingMgr::instance().setMode(false);
} catch (const std::exception& ex) {
MultiThreadingMgr::instance().setMode(false);
ADD_FAILURE() << "parser threw an exception: " << ex.what();
FAIL() << "parser threw an exception: " << ex.what();
}
ASSERT_TRUE(queue_control);
ASSERT_TRUE(queue_control->get("enable-queue"));

View File

@@ -69,12 +69,12 @@ MultiThreadingMgr::setThreadPoolSize(uint32_t size) {
uint32_t
MultiThreadingMgr::getPacketQueueSize() {
return (thread_pool_.getMaxCount());
return (thread_pool_.getMaxQueueSize());
}
void
MultiThreadingMgr::setPacketQueueSize(uint32_t size) {
thread_pool_.setMaxCount(size);
thread_pool_.setMaxQueueSize(size);
}
uint32_t

View File

@@ -56,17 +56,17 @@ TEST(MultiThreadingMgrTest, threadPoolSize) {
TEST(MultiThreadingMgrTest, packetQueueSize) {
// default queue size is 0
EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxCount(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxQueueSize(), 0);
// set queue size to 16
EXPECT_NO_THROW(MultiThreadingMgr::instance().setPacketQueueSize(16));
// queue size should be 16
EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 16);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxCount(), 16);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxQueueSize(), 16);
// set queue size to 0
EXPECT_NO_THROW(MultiThreadingMgr::instance().setPacketQueueSize(0));
// queue size should be 0
EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxCount(), 0);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().getMaxQueueSize(), 0);
}
/// @brief Verifies that detecting thread count works.

View File

@@ -220,7 +220,9 @@ TEST_F(ThreadPoolTest, testAddAndCount) {
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
@@ -286,7 +288,9 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
@@ -324,7 +328,9 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
// add items to running thread pool
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// wait for all items to be processed
@@ -364,7 +370,9 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
@@ -415,7 +423,9 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
// add items to running thread pool
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
bool ret = true;
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// wait for all items to be processed
@@ -445,7 +455,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
}
/// @brief test ThreadPool max count
TEST_F(ThreadPoolTest, testMaxCount) {
TEST_F(ThreadPoolTest, testMaxQueueSize) {
uint32_t items_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
@@ -459,23 +469,26 @@ TEST_F(ThreadPoolTest, testMaxCount) {
call_back = std::bind(&ThreadPoolTest::run, this);
// add items to stopped thread pool
bool ret = true;
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// change the max count
ASSERT_EQ(thread_pool.getMaxCount(), 0);
size_t max_count = 10;
thread_pool.setMaxCount(max_count);
EXPECT_EQ(thread_pool.getMaxCount(), max_count);
ASSERT_EQ(thread_pool.getMaxQueueSize(), 0);
size_t max_queue_size = 10;
thread_pool.setMaxQueueSize(max_queue_size);
EXPECT_EQ(thread_pool.getMaxQueueSize(), max_queue_size);
// adding an item should squeeze the queue
EXPECT_EQ(thread_pool.count(), items_count);
EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_EQ(thread_pool.count(), max_count);
EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
EXPECT_FALSE(ret);
EXPECT_EQ(thread_pool.count(), max_queue_size);
}
} // namespace

View File

@@ -78,8 +78,10 @@ struct ThreadPool {
/// @brief add a work item to the thread pool
///
/// @param item the 'functor' object to be added to the queue
void add(const WorkItemPtr& item) {
queue_.push(item);
/// @return false if the queue was full and oldest item(s) was dropped,
/// true otherwise.
bool add(const WorkItemPtr& item) {
return (queue_.push(item));
}
/// @brief count number of work items in the queue
@@ -91,16 +93,16 @@ struct ThreadPool {
/// @brief set maximum number of work items in the queue
///
/// @param max_count the maximum count (0 means unlimited)
void setMaxCount(size_t max_count) {
queue_.setMaxCount(max_count);
/// @param max_queue_size the maximum count (0 means unlimited)
void setMaxQueueSize(size_t max_queue_size) {
queue_.setMaxQueueSize(max_queue_size);
}
/// @brief get maximum number of work items in the queue
///
/// @return the maximum count (0 means unlimited)
size_t getMaxCount() {
return (queue_.getMaxCount());
size_t getMaxQueueSize() {
return (queue_.getMaxQueueSize());
}
/// @brief size number of thread pool threads
@@ -163,7 +165,7 @@ private:
/// @brief Constructor
///
/// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue() : enabled_(false), max_count_(0) {
ThreadPoolQueue() : enabled_(false), max_queue_size_(0) {
}
/// @brief Destructor
@@ -177,17 +179,17 @@ private:
/// @brief get maximum number of work items in the queue
///
/// @return the maximum count (0 means unlimited)
void setMaxCount(size_t max_count) {
void setMaxQueueSize(size_t max_queue_size) {
std::lock_guard<std::mutex> lock(mutex_);
max_count_ = max_count;
max_queue_size_ = max_queue_size;
}
/// @brief get maximum number of work items in the queue
///
/// @return the maximum count (0 means unlimited)
size_t getMaxCount() {
size_t getMaxQueueSize() {
std::lock_guard<std::mutex> lock(mutex_);
return (max_count_);
return (max_queue_size_);
}
/// @brief push work item to the queue
@@ -196,23 +198,30 @@ private:
/// When the queue is full oldest items are removed.
/// This function adds an item to the queue and wakes up at least one thread
/// waiting on the queue.
/// When the queue is full oldest item(s) is dropped and false returned.
///
/// @param item the new item to be added to the queue
void push(const Item& item) {
/// @return true if the queue was not full and oldest item(s) dropped.
bool push(const Item& item) {
bool ret = true;
if (!item) {
return;
return (ret);
}
{
std::lock_guard<std::mutex> lock(mutex_);
if (max_count_ > 0) {
while (queue_.size() >= max_count_) {
if (max_queue_size_ > 0) {
while (queue_.size() >= max_queue_size_) {
queue_.pop();
if (ret) {
ret = false;
}
}
}
queue_.push(item);
}
// Notify pop function so that it can effectively remove a work item.
cv_.notify_one();
return (ret);
}
/// @brief pop work item from the queue or block waiting
@@ -301,7 +310,7 @@ private:
/// @brief maximum number of work items in the queue
/// (0 means unlimited)
size_t max_count_;
size_t max_queue_size_;
};
/// @brief run function of each thread