2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-31 05:55:28 +00:00

[#1147] Checkpoint: implemented queue of one for v6 client

This commit is contained in:
Francis Dupont
2020-05-09 18:04:23 +02:00
parent 25208e91c4
commit fd50371a22
5 changed files with 354 additions and 13 deletions

View File

@@ -10,8 +10,10 @@
#include <dhcp6/dhcp6_log.h>
#include <exceptions/exceptions.h>
#include <stats/stats_mgr.h>
#include <util/multi_threading_mgr.h>
using namespace std;
using namespace isc::util;
namespace isc {
namespace dhcp {
@@ -71,10 +73,22 @@ ClientHandler::unLock() {
}
// Assume erase will never fail so not checking its result.
clients_.erase(locked_->getDuid());
if (!client_ || !client_->cont_) {
return;
}
// Try to process next query. As the caller holds the mutex of
// the handler class the continuation will be resumed after.
MultiThreadingMgr& mt_mgr = MultiThreadingMgr::instance();
if (mt_mgr.getMode()) {
if (!mt_mgr.getThreadPool().addFront(client_->cont_)) {
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_PACKET_QUEUE_FULL);
}
}
client_->cont_.reset();
}
bool
ClientHandler::tryLock(Pkt6Ptr query) {
ClientHandler::tryLock(Pkt6Ptr query, ContinuationPtr cont) {
if (!query) {
isc_throw(InvalidParameter, "null query in ClientHandler::tryLock");
}
@@ -102,7 +116,25 @@ ClientHandler::tryLock(Pkt6Ptr query) {
return (false);
}
}
// This query is a duplicate: currently it is simply dropped.
// This query can be a duplicate so put the continuation.
if (cont) {
Pkt6Ptr next_query = holder->next_query_;
holder->next_query_ = query;
holder->cont_ = cont;
if (next_query) {
// Logging a warning as it is supposed to be a rare event
// with well behaving clients...
LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE)
.arg(next_query->toText())
.arg(this_thread::get_id())
.arg(query->toText())
.arg(this_thread::get_id());
stats::StatsMgr::instance().addValue("pkt6-receive-drop",
static_cast<int64_t>(1));
}
return (false);
}
// Logging a warning as it is supposed to be a rare event
// with well behaving clients...
LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE)

View File

@@ -13,6 +13,7 @@
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <functional>
#include <mutex>
#include <thread>
@@ -23,6 +24,19 @@ namespace dhcp {
class ClientHandler : public boost::noncopyable {
public:
/// @brief Define the type of packet processing continuation.
typedef std::function<void()> Continuation;
/// @brief Define the type of shared pointers to continuations.
typedef boost::shared_ptr<Continuation> ContinuationPtr;
/// @brief Continuation factory.
///
/// @param cont Continuation rvalue.
static ContinuationPtr makeContinuation(Continuation&& cont) {
return (boost::make_shared<Continuation>(cont));
}
/// @brief Constructor.
ClientHandler();
@@ -33,10 +47,15 @@ public:
/// @brief Tries to acquires a client.
///
/// Lookup the client:
/// - if not found insert the client in the clients map and return true
/// - if found put the continuation in the holder and return false
///
/// @param query The query from the client.
/// @param cont The continuation in the case the client was held.
/// @return true if the client was acquired, false if there is already
/// a query from the same client.
bool tryLock(Pkt6Ptr query);
bool tryLock(Pkt6Ptr query, ContinuationPtr cont = ContinuationPtr());
private:
@@ -58,6 +77,12 @@ private:
/// @brief The ID of the thread processing the query.
std::thread::id thread_;
/// @brief The next query.
Pkt6Ptr next_query_;
/// @brief The continuation to process next query for the client.
ContinuationPtr cont_;
};
/// @brief The type of shared pointers to clients.
@@ -87,6 +112,9 @@ private:
/// @brief Release a client.
///
/// If the client has a continuation, push it at front of the thread
/// packet queue.
///
/// The mutex must be held by the caller.
void unLock();

View File

@@ -9,6 +9,8 @@
#include <dhcp6/client_handler.h>
#include <dhcp6/tests/dhcp6_test_utils.h>
#include <stats/stats_mgr.h>
#include <util/multi_threading_mgr.h>
#include <unistd.h>
using namespace isc;
using namespace isc::dhcp;
@@ -25,7 +27,8 @@ public:
/// @brief Constructor.
///
/// Creates the pkt6-receive-drop statistic.
ClientHandleTest() {
ClientHandleTest() : called1_(false), called2_(false), called3_(false) {
MultiThreadingMgr::instance().apply(false, 0, 0);
StatsMgr::instance().setValue("pkt6-receive-drop", static_cast<int64_t>(0));
}
@@ -33,6 +36,7 @@ public:
///
/// Removes statistics.
~ClientHandleTest() {
MultiThreadingMgr::instance().apply(false, 0, 0);
StatsMgr::instance().removeAll();
}
@@ -63,6 +67,38 @@ public:
EXPECT_EQ(0, obs->getInteger().first);
}
}
/// @brief Waits for pending continuations.
void waitForThreads() {
while (MultiThreadingMgr::instance().getThreadPool().count() > 0) {
usleep(100);
}
}
/// @brief Set called1_ to true.
void setCalled1() {
called1_ = true;
}
/// @brief Set called2_ to true.
void setCalled2() {
called2_ = true;
}
/// @brief Set called3_ to true.
void setCalled3() {
called3_ = true;
}
/// @brief The called flag number 1.
bool called1_;
/// @brief The called flag number 2.
bool called2_;
/// @brief The called flag number 3.
bool called3_;
};
// Verifies behavior with empty block.
@@ -73,6 +109,7 @@ TEST_F(ClientHandleTest, empty) {
} catch (const std::exception& ex) {
ADD_FAILURE() << "unexpected exception: " << ex.what();
}
checkStat(false);
}
// Verifies behavior with one query.
@@ -283,4 +320,179 @@ TEST_F(ClientHandleTest, doubleTryLock) {
// Cannot verifies that empty client ID fails because getClientId() handles
// this condition and replaces it by no client ID.
// Verifies behavior with two queries for the same client and multi-threading.
TEST_F(ClientHandleTest, serializeTwoQueries) {
// Get two queries.
Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
OptionPtr client_id = generateClientId();
// Same client ID: same client.
sol->addOption(client_id);
req->addOption(client_id);
// Start multi-threading.
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
try {
// Get a client handler.
ClientHandler client_handler;
// Create a continuation.
ClientHandler::ContinuationPtr cont1 =
ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this));
// Try to lock it with the solicit.
bool duplicate = false;
EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1));
// Should return false (no duplicate).
EXPECT_FALSE(duplicate);
// Get a second client handler.
ClientHandler client_handler2;
// Create a continuation.
ClientHandler::ContinuationPtr cont2 =
ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this));
// Try to lock it with a request.
EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2));
// Should return false (multi-threading enforces serialization).
EXPECT_FALSE(duplicate);
} catch (const std::exception& ex) {
ADD_FAILURE() << "unexpected exception: " << ex.what();
}
// Give the second continuation a chance.
waitForThreads();
// Force multi-threading to stop;
MultiThreadingCriticalSection cs;
checkStat(false);
EXPECT_FALSE(called1_);
EXPECT_TRUE(called2_);
}
// Verifies behavior with two queries for the same client and multi-threading.
// Continuations are required for serialization.
TEST_F(ClientHandleTest, serializeNoCont) {
// Get two queries.
Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
OptionPtr client_id = generateClientId();
// Same client ID: same client.
sol->addOption(client_id);
req->addOption(client_id);
// Start multi-threading.
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
try {
// Get a client handler.
ClientHandler client_handler;
// Try to lock it with the solicit.
bool duplicate = false;
EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol));
// Should return false (no duplicate).
EXPECT_FALSE(duplicate);
// Get a second client handler.
ClientHandler client_handler2;
// Try to lock it with a request.
EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req));
// Should return true (duplicate without continuation).
EXPECT_TRUE(duplicate);
} catch (const std::exception& ex) {
ADD_FAILURE() << "unexpected exception: " << ex.what();
}
// Give the second continuation a chance even there is none...
waitForThreads();
// Force multi-threading to stop;
MultiThreadingCriticalSection cs;
checkStat(true);
}
// Verifies behavior with three queries for the same client and
// multi-threading: currently we accept only two queries,
// a third one replaces second so we get the first (oldest) query and
// the last (newest) query when the client is busy.
TEST_F(ClientHandleTest, serializeThreeQueries) {
// Get two queries.
Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
Pkt6Ptr ren(new Pkt6(DHCPV6_RENEW, 3456));
OptionPtr client_id = generateClientId();
// Same client ID: same client.
sol->addOption(client_id);
req->addOption(client_id);
ren->addOption(client_id);
// Start multi-threading.
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
try {
// Get a client handler.
ClientHandler client_handler;
// Create a continuation.
ClientHandler::ContinuationPtr cont1 =
ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this));
// Try to lock it with the solicit.
bool duplicate = false;
EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1));
// Should return false (no duplicate).
EXPECT_FALSE(duplicate);
// Get a second client handler.
ClientHandler client_handler2;
// Create a continuation.
ClientHandler::ContinuationPtr cont2 =
ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this));
// Try to lock it with a request.
EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2));
// Should return false (multi-threading enforces serialization).
EXPECT_FALSE(duplicate);
// Get a third client handler.
ClientHandler client_handler3;
// Create a continuation.
ClientHandler::ContinuationPtr cont3 =
ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled3, this));
// Try to lock it with a renew.
EXPECT_NO_THROW(duplicate = client_handler3.tryLock(ren, cont3));
// Should return false (multi-threading enforces serialization).
EXPECT_FALSE(duplicate);
} catch (const std::exception& ex) {
ADD_FAILURE() << "unexpected exception: " << ex.what();
}
// Give the second continuation a chance.
waitForThreads();
// Force multi-threading to stop;
MultiThreadingCriticalSection cs;
checkStat(true);
EXPECT_FALSE(called1_);
EXPECT_FALSE(called2_);
EXPECT_TRUE(called3_);
}
} // end of anonymous namespace

View File

@@ -205,7 +205,7 @@ private:
};
/// @brief test ThreadPool add and count
TEST_F(ThreadPoolTest, testAddAndCount) {
TEST_F(ThreadPoolTest, addAndCount) {
uint32_t items_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
@@ -237,7 +237,7 @@ TEST_F(ThreadPoolTest, testAddAndCount) {
}
/// @brief test ThreadPool start and stop
TEST_F(ThreadPoolTest, testStartAndStop) {
TEST_F(ThreadPoolTest, startAndStop) {
uint32_t items_count;
uint32_t thread_count;
CallBack call_back;
@@ -455,7 +455,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
}
/// @brief test ThreadPool max queue size
TEST_F(ThreadPoolTest, testMaxQueueSize) {
TEST_F(ThreadPoolTest, maxQueueSize) {
uint32_t items_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
@@ -491,4 +491,41 @@ TEST_F(ThreadPoolTest, testMaxQueueSize) {
EXPECT_EQ(thread_pool.count(), max_queue_size);
}
/// @brief test ThreadPool add front.
TEST_F(ThreadPoolTest, addFront) {
uint32_t items_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
items_count = 20;
call_back = std::bind(&ThreadPoolTest::run, this);
// add items to stopped thread pool
bool ret = true;
for (uint32_t i = 0; i < items_count; ++i) {
EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared<CallBack>(call_back)));
EXPECT_TRUE(ret);
}
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// change the max count
ASSERT_EQ(thread_pool.getMaxQueueSize(), 0);
size_t max_queue_size = 10;
thread_pool.setMaxQueueSize(max_queue_size);
EXPECT_EQ(thread_pool.getMaxQueueSize(), max_queue_size);
// adding an item at front should change nothing queue
EXPECT_EQ(thread_pool.count(), items_count);
EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared<CallBack>(call_back)));
EXPECT_FALSE(ret);
EXPECT_EQ(thread_pool.count(), items_count);
}
} // namespace

View File

@@ -26,7 +26,7 @@ namespace util {
///
/// @tparam WorkItem a functor
/// @tparam Container a 'queue like' container
template <typename WorkItem, typename Container = std::queue<boost::shared_ptr<WorkItem>>>
template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
struct ThreadPool {
typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
@@ -81,7 +81,15 @@ struct ThreadPool {
/// @return false if the queue was full and oldest item(s) was dropped,
/// true otherwise.
bool add(const WorkItemPtr& item) {
return (queue_.push(item));
return (queue_.push_back(item));
}
/// @brief add a work item to the thread pool at front
///
/// @param item the 'functor' object to be added to the queue
/// @return false if the queue was full, true otherwise.
bool addFront(const WorkItemPtr& item) {
return (queue_.push_front(item));
}
/// @brief count number of work items in the queue
@@ -203,7 +211,7 @@ private:
/// @param item the new item to be added to the queue
/// @return false if the queue was full and oldest item(s) dropped,
/// true otherwise
bool push(const Item& item) {
bool push_back(const Item& item) {
bool ret = true;
if (!item) {
return (ret);
@@ -212,17 +220,41 @@ private:
std::lock_guard<std::mutex> lock(mutex_);
if (max_queue_size_ != 0) {
while (queue_.size() >= max_queue_size_) {
queue_.pop();
queue_.pop_front();
ret = false;
}
}
queue_.push(item);
queue_.push_back(item);
}
// Notify pop function so that it can effectively remove a work item.
cv_.notify_one();
return (ret);
}
/// @brief push work item to the queue at front.
///
/// Used to add work items to the queue at front.
/// When the queue is full the item is not added.
///
/// @param item the new item to be added to the queue
/// @return false if the queue was full, true otherwise
bool push_front(const Item& item) {
if (!item) {
return (true);
}
{
std::lock_guard<std::mutex> lock(mutex_);
if ((max_queue_size_ != 0) &&
(queue_.size() >= max_queue_size_)) {
return (false);
}
queue_.push_front(item);
}
// Notify pop function so that it can effectively remove a work item.
cv_.notify_one();
return (true);
}
/// @brief pop work item from the queue or block waiting
///
/// Used to retrieve and remove a work item from the queue
@@ -241,7 +273,7 @@ private:
return (Item());
}
Item item = queue_.front();
queue_.pop();
queue_.pop_front();
return (item);
}