diff --git a/src/bin/dhcp6/client_handler.cc b/src/bin/dhcp6/client_handler.cc index 9cbd5782bc..e2eba6f62e 100644 --- a/src/bin/dhcp6/client_handler.cc +++ b/src/bin/dhcp6/client_handler.cc @@ -10,8 +10,10 @@ #include #include #include +#include using namespace std; +using namespace isc::util; namespace isc { namespace dhcp { @@ -71,10 +73,22 @@ ClientHandler::unLock() { } // Assume erase will never fail so not checking its result. clients_.erase(locked_->getDuid()); + if (!client_ || !client_->cont_) { + return; + } + // Try to process next query. As the caller holds the mutex of + // the handler class the continuation will be resumed after. + MultiThreadingMgr& mt_mgr = MultiThreadingMgr::instance(); + if (mt_mgr.getMode()) { + if (!mt_mgr.getThreadPool().addFront(client_->cont_)) { + LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_PACKET_QUEUE_FULL); + } + } + client_->cont_.reset(); } bool -ClientHandler::tryLock(Pkt6Ptr query) { +ClientHandler::tryLock(Pkt6Ptr query, ContinuationPtr cont) { if (!query) { isc_throw(InvalidParameter, "null query in ClientHandler::tryLock"); } @@ -102,7 +116,25 @@ ClientHandler::tryLock(Pkt6Ptr query) { return (false); } } - // This query is a duplicate: currently it is simply dropped. + // This query can be a duplicate so put the continuation. + if (cont) { + Pkt6Ptr next_query = holder->next_query_; + holder->next_query_ = query; + holder->cont_ = cont; + if (next_query) { + // Logging a warning as it is supposed to be a rare event + // with well behaving clients... + LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE) + .arg(next_query->toText()) + .arg(this_thread::get_id()) + .arg(query->toText()) + .arg(this_thread::get_id()); + stats::StatsMgr::instance().addValue("pkt6-receive-drop", + static_cast(1)); + } + return (false); + } + // Logging a warning as it is supposed to be a rare event // with well behaving clients... LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE) diff --git a/src/bin/dhcp6/client_handler.h b/src/bin/dhcp6/client_handler.h index cb31196524..01f0dd1130 100644 --- a/src/bin/dhcp6/client_handler.h +++ b/src/bin/dhcp6/client_handler.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,19 @@ namespace dhcp { class ClientHandler : public boost::noncopyable { public: + /// @brief Define the type of packet processing continuation. + typedef std::function Continuation; + + /// @brief Define the type of shared pointers to continuations. + typedef boost::shared_ptr ContinuationPtr; + + /// @brief Continuation factory. + /// + /// @param cont Continuation rvalue. + static ContinuationPtr makeContinuation(Continuation&& cont) { + return (boost::make_shared(cont)); + } + /// @brief Constructor. ClientHandler(); @@ -33,10 +47,15 @@ public: /// @brief Tries to acquires a client. /// + /// Lookup the client: + /// - if not found insert the client in the clients map and return true + /// - if found put the continuation in the holder and return false + /// /// @param query The query from the client. + /// @param cont The continuation in the case the client was held. /// @return true if the client was acquired, false if there is already /// a query from the same client. - bool tryLock(Pkt6Ptr query); + bool tryLock(Pkt6Ptr query, ContinuationPtr cont = ContinuationPtr()); private: @@ -58,6 +77,12 @@ private: /// @brief The ID of the thread processing the query. std::thread::id thread_; + + /// @brief The next query. + Pkt6Ptr next_query_; + + /// @brief The continuation to process next query for the client. + ContinuationPtr cont_; }; /// @brief The type of shared pointers to clients. @@ -87,6 +112,9 @@ private: /// @brief Release a client. /// + /// If the client has a continuation, push it at front of the thread + /// packet queue. + /// /// The mutex must be held by the caller. void unLock(); diff --git a/src/bin/dhcp6/tests/client_handler_unittest.cc b/src/bin/dhcp6/tests/client_handler_unittest.cc index 469e5c6508..8f6f1f36ec 100644 --- a/src/bin/dhcp6/tests/client_handler_unittest.cc +++ b/src/bin/dhcp6/tests/client_handler_unittest.cc @@ -9,6 +9,8 @@ #include #include #include +#include +#include using namespace isc; using namespace isc::dhcp; @@ -25,7 +27,8 @@ public: /// @brief Constructor. /// /// Creates the pkt6-receive-drop statistic. - ClientHandleTest() { + ClientHandleTest() : called1_(false), called2_(false), called3_(false) { + MultiThreadingMgr::instance().apply(false, 0, 0); StatsMgr::instance().setValue("pkt6-receive-drop", static_cast(0)); } @@ -33,6 +36,7 @@ public: /// /// Removes statistics. ~ClientHandleTest() { + MultiThreadingMgr::instance().apply(false, 0, 0); StatsMgr::instance().removeAll(); } @@ -63,6 +67,38 @@ public: EXPECT_EQ(0, obs->getInteger().first); } } + + /// @brief Waits for pending continuations. + void waitForThreads() { + while (MultiThreadingMgr::instance().getThreadPool().count() > 0) { + usleep(100); + } + } + + /// @brief Set called1_ to true. + void setCalled1() { + called1_ = true; + } + + /// @brief Set called2_ to true. + void setCalled2() { + called2_ = true; + } + + /// @brief Set called3_ to true. + void setCalled3() { + called3_ = true; + } + + /// @brief The called flag number 1. + bool called1_; + + /// @brief The called flag number 2. + bool called2_; + + /// @brief The called flag number 3. + bool called3_; + }; // Verifies behavior with empty block. @@ -73,6 +109,7 @@ TEST_F(ClientHandleTest, empty) { } catch (const std::exception& ex) { ADD_FAILURE() << "unexpected exception: " << ex.what(); } + checkStat(false); } // Verifies behavior with one query. @@ -283,4 +320,179 @@ TEST_F(ClientHandleTest, doubleTryLock) { // Cannot verifies that empty client ID fails because getClientId() handles // this condition and replaces it by no client ID. +// Verifies behavior with two queries for the same client and multi-threading. +TEST_F(ClientHandleTest, serializeTwoQueries) { + // Get two queries. + Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234)); + Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345)); + OptionPtr client_id = generateClientId(); + // Same client ID: same client. + sol->addOption(client_id); + req->addOption(client_id); + + // Start multi-threading. + EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0)); + + try { + // Get a client handler. + ClientHandler client_handler; + + // Create a continuation. + ClientHandler::ContinuationPtr cont1 = + ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this)); + + // Try to lock it with the solicit. + bool duplicate = false; + EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1)); + + // Should return false (no duplicate). + EXPECT_FALSE(duplicate); + + // Get a second client handler. + ClientHandler client_handler2; + + // Create a continuation. + ClientHandler::ContinuationPtr cont2 = + ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this)); + + // Try to lock it with a request. + EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2)); + + // Should return false (multi-threading enforces serialization). + EXPECT_FALSE(duplicate); + } catch (const std::exception& ex) { + ADD_FAILURE() << "unexpected exception: " << ex.what(); + } + + // Give the second continuation a chance. + waitForThreads(); + + // Force multi-threading to stop; + MultiThreadingCriticalSection cs; + + checkStat(false); + EXPECT_FALSE(called1_); + EXPECT_TRUE(called2_); +} + +// Verifies behavior with two queries for the same client and multi-threading. +// Continuations are required for serialization. +TEST_F(ClientHandleTest, serializeNoCont) { + // Get two queries. + Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234)); + Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345)); + OptionPtr client_id = generateClientId(); + // Same client ID: same client. + sol->addOption(client_id); + req->addOption(client_id); + + // Start multi-threading. + EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0)); + + try { + // Get a client handler. + ClientHandler client_handler; + + // Try to lock it with the solicit. + bool duplicate = false; + EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol)); + + // Should return false (no duplicate). + EXPECT_FALSE(duplicate); + + // Get a second client handler. + ClientHandler client_handler2; + + // Try to lock it with a request. + EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req)); + + // Should return true (duplicate without continuation). + EXPECT_TRUE(duplicate); + } catch (const std::exception& ex) { + ADD_FAILURE() << "unexpected exception: " << ex.what(); + } + + // Give the second continuation a chance even there is none... + waitForThreads(); + + // Force multi-threading to stop; + MultiThreadingCriticalSection cs; + + checkStat(true); +} + +// Verifies behavior with three queries for the same client and +// multi-threading: currently we accept only two queries, +// a third one replaces second so we get the first (oldest) query and +// the last (newest) query when the client is busy. +TEST_F(ClientHandleTest, serializeThreeQueries) { + // Get two queries. + Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234)); + Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345)); + Pkt6Ptr ren(new Pkt6(DHCPV6_RENEW, 3456)); + OptionPtr client_id = generateClientId(); + // Same client ID: same client. + sol->addOption(client_id); + req->addOption(client_id); + ren->addOption(client_id); + + // Start multi-threading. + EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0)); + + try { + // Get a client handler. + ClientHandler client_handler; + + // Create a continuation. + ClientHandler::ContinuationPtr cont1 = + ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this)); + + // Try to lock it with the solicit. + bool duplicate = false; + EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1)); + + // Should return false (no duplicate). + EXPECT_FALSE(duplicate); + + // Get a second client handler. + ClientHandler client_handler2; + + // Create a continuation. + ClientHandler::ContinuationPtr cont2 = + ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this)); + + // Try to lock it with a request. + EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2)); + + // Should return false (multi-threading enforces serialization). + EXPECT_FALSE(duplicate); + + // Get a third client handler. + ClientHandler client_handler3; + + // Create a continuation. + ClientHandler::ContinuationPtr cont3 = + ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled3, this)); + + // Try to lock it with a renew. + EXPECT_NO_THROW(duplicate = client_handler3.tryLock(ren, cont3)); + + // Should return false (multi-threading enforces serialization). + EXPECT_FALSE(duplicate); + } catch (const std::exception& ex) { + ADD_FAILURE() << "unexpected exception: " << ex.what(); + } + + // Give the second continuation a chance. + waitForThreads(); + + // Force multi-threading to stop; + MultiThreadingCriticalSection cs; + + checkStat(true); + EXPECT_FALSE(called1_); + EXPECT_FALSE(called2_); + EXPECT_TRUE(called3_); +} + } // end of anonymous namespace diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index c2902bc0f7..41daecc855 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -205,7 +205,7 @@ private: }; /// @brief test ThreadPool add and count -TEST_F(ThreadPoolTest, testAddAndCount) { +TEST_F(ThreadPoolTest, addAndCount) { uint32_t items_count; CallBack call_back; ThreadPool thread_pool; @@ -237,7 +237,7 @@ TEST_F(ThreadPoolTest, testAddAndCount) { } /// @brief test ThreadPool start and stop -TEST_F(ThreadPoolTest, testStartAndStop) { +TEST_F(ThreadPoolTest, startAndStop) { uint32_t items_count; uint32_t thread_count; CallBack call_back; @@ -455,7 +455,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { } /// @brief test ThreadPool max queue size -TEST_F(ThreadPoolTest, testMaxQueueSize) { +TEST_F(ThreadPoolTest, maxQueueSize) { uint32_t items_count; CallBack call_back; ThreadPool thread_pool; @@ -491,4 +491,41 @@ TEST_F(ThreadPoolTest, testMaxQueueSize) { EXPECT_EQ(thread_pool.count(), max_queue_size); } +/// @brief test ThreadPool add front. +TEST_F(ThreadPoolTest, addFront) { + uint32_t items_count; + CallBack call_back; + ThreadPool thread_pool; + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 20; + + call_back = std::bind(&ThreadPoolTest::run, this); + + // add items to stopped thread pool + bool ret = true; + for (uint32_t i = 0; i < items_count; ++i) { + EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + + // change the max count + ASSERT_EQ(thread_pool.getMaxQueueSize(), 0); + size_t max_queue_size = 10; + thread_pool.setMaxQueueSize(max_queue_size); + EXPECT_EQ(thread_pool.getMaxQueueSize(), max_queue_size); + + // adding an item at front should change nothing queue + EXPECT_EQ(thread_pool.count(), items_count); + EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared(call_back))); + EXPECT_FALSE(ret); + EXPECT_EQ(thread_pool.count(), items_count); +} + } // namespace diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index ccad7bdf37..fdc29ee7fe 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -26,7 +26,7 @@ namespace util { /// /// @tparam WorkItem a functor /// @tparam Container a 'queue like' container -template >> +template >> struct ThreadPool { typedef typename boost::shared_ptr WorkItemPtr; @@ -81,7 +81,15 @@ struct ThreadPool { /// @return false if the queue was full and oldest item(s) was dropped, /// true otherwise. bool add(const WorkItemPtr& item) { - return (queue_.push(item)); + return (queue_.push_back(item)); + } + + /// @brief add a work item to the thread pool at front + /// + /// @param item the 'functor' object to be added to the queue + /// @return false if the queue was full, true otherwise. + bool addFront(const WorkItemPtr& item) { + return (queue_.push_front(item)); } /// @brief count number of work items in the queue @@ -203,7 +211,7 @@ private: /// @param item the new item to be added to the queue /// @return false if the queue was full and oldest item(s) dropped, /// true otherwise - bool push(const Item& item) { + bool push_back(const Item& item) { bool ret = true; if (!item) { return (ret); @@ -212,17 +220,41 @@ private: std::lock_guard lock(mutex_); if (max_queue_size_ != 0) { while (queue_.size() >= max_queue_size_) { - queue_.pop(); + queue_.pop_front(); ret = false; } } - queue_.push(item); + queue_.push_back(item); } // Notify pop function so that it can effectively remove a work item. cv_.notify_one(); return (ret); } + /// @brief push work item to the queue at front. + /// + /// Used to add work items to the queue at front. + /// When the queue is full the item is not added. + /// + /// @param item the new item to be added to the queue + /// @return false if the queue was full, true otherwise + bool push_front(const Item& item) { + if (!item) { + return (true); + } + { + std::lock_guard lock(mutex_); + if ((max_queue_size_ != 0) && + (queue_.size() >= max_queue_size_)) { + return (false); + } + queue_.push_front(item); + } + // Notify pop function so that it can effectively remove a work item. + cv_.notify_one(); + return (true); + } + /// @brief pop work item from the queue or block waiting /// /// Used to retrieve and remove a work item from the queue @@ -241,7 +273,7 @@ private: return (Item()); } Item item = queue_.front(); - queue_.pop(); + queue_.pop_front(); return (item); }